RANGER-276 Add support for aggregating audit logs at source

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/236f1ba6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/236f1ba6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/236f1ba6

Branch: refs/heads/master
Commit: 236f1ba67287725544c5a2d07a688d0894de46c3
Parents: 3adafa4
Author: Don Bosco Durai <[email protected]>
Authored: Mon Apr 20 10:23:47 2015 -0700
Committer: Don Bosco Durai <[email protected]>
Committed: Mon Apr 20 10:23:47 2015 -0700

----------------------------------------------------------------------
 .../audit/destination/AuditDestination.java     |  71 ++
 .../audit/destination/FileAuditDestination.java | 231 +++++
 .../audit/destination/HDFSAuditDestination.java | 244 +++++
 .../ranger/audit/model/AuditEventBase.java      |   9 +-
 .../ranger/audit/model/AuthzAuditEvent.java     | 247 +++---
 .../ranger/audit/provider/AuditAsyncQueue.java  | 167 ----
 .../audit/provider/AuditBatchProcessor.java     | 327 -------
 .../ranger/audit/provider/AuditDestination.java |  70 --
 .../ranger/audit/provider/AuditFileSpool.java   | 875 ------------------
 .../audit/provider/AuditProviderFactory.java    |  64 +-
 .../audit/provider/BaseAuditProvider.java       |   1 +
 .../audit/provider/BufferedAuditProvider.java   |  36 +-
 .../ranger/audit/provider/DbAuditProvider.java  |  17 +-
 .../audit/provider/FileAuditDestination.java    | 230 -----
 .../audit/provider/HDFSAuditDestination.java    | 243 -----
 .../audit/provider/Log4jAuditProvider.java      |   1 +
 .../audit/provider/MultiDestAuditProvider.java  |   2 +
 .../ranger/audit/queue/AuditAsyncQueue.java     | 174 ++++
 .../ranger/audit/queue/AuditBatchQueue.java     | 346 ++++++++
 .../ranger/audit/queue/AuditFileSpool.java      | 884 +++++++++++++++++++
 .../ranger/audit/queue/AuditSummaryQueue.java   | 255 ++++++
 .../apache/ranger/audit/TestAuditProcessor.java | 786 -----------------
 .../org/apache/ranger/audit/TestAuditQueue.java | 704 +++++++++++++++
 .../org/apache/ranger/audit/TestConsumer.java   | 248 ++++++
 24 files changed, 3390 insertions(+), 2842 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
new file mode 100644
index 0000000..25c0220
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.destination;
+
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+
+/**
+ * This class needs to be extended by anyone who wants to build custom
+ * destination
+ */
+public abstract class AuditDestination extends BaseAuditProvider {
+       private static final Log logger = 
LogFactory.getLog(AuditDestination.class);
+
+       public AuditDestination() {
+               logger.info("AuditDestination() enter");
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties,
+        * java.lang.String)
+        */
+       @Override
+       public void init(Properties prop, String basePropertyName) {
+               super.init(prop, basePropertyName);
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+        */
+       @Override
+       public boolean isFlushPending() {
+               return false;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+        */
+       @Override
+       public void flush() {
+
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
new file mode 100644
index 0000000..1ccfd5f
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.destination;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+/**
+ * This class write the logs to local file
+ */
+public class FileAuditDestination extends AuditDestination {
+       private static final Log logger = LogFactory
+                       .getLog(FileAuditDestination.class);
+
+       public static final String PROP_FILE_LOCAL_DIR = "dir";
+       public static final String PROP_FILE_LOCAL_FILE_NAME_FORMAT = 
"filename.format";
+       public static final String PROP_FILE_FILE_ROLLOVER = 
"file.rollover.sec";
+
+       String baseFolder = null;
+       String fileFormat = null;
+       int fileRolloverSec = 24 * 60 * 60; // In seconds
+       private String logFileNameFormat;
+
+       boolean initDone = false;
+
+       private File logFolder;
+       PrintWriter logWriter = null;
+
+       private Date fileCreateTime = null;
+
+       private String currentFileName;
+
+       private boolean isStopped = false;
+
+       @Override
+       public void init(Properties prop, String propPrefix) {
+               super.init(prop, propPrefix);
+
+               // Initialize properties for this class
+               // Initial folder and file properties
+               String logFolderProp = MiscUtil.getStringProperty(props, 
propPrefix
+                               + "." + PROP_FILE_LOCAL_DIR);
+               logFileNameFormat = MiscUtil.getStringProperty(props, 
propPrefix + "."
+                               + PROP_FILE_LOCAL_FILE_NAME_FORMAT);
+               fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + 
"."
+                               + PROP_FILE_FILE_ROLLOVER, fileRolloverSec);
+
+               if (logFolderProp == null || logFolderProp.isEmpty()) {
+                       logger.error("File destination folder is not 
configured. Please set "
+                                       + propPrefix
+                                       + "."
+                                       + PROP_FILE_LOCAL_DIR
+                                       + ". name="
+                                       + getName());
+                       return;
+               }
+               logFolder = new File(logFolderProp);
+               if (!logFolder.isDirectory()) {
+                       logFolder.mkdirs();
+                       if (!logFolder.isDirectory()) {
+                               logger.error("FileDestination folder not found 
and can't be created. folder="
+                                               + logFolder.getAbsolutePath() + 
", name=" + getName());
+                               return;
+                       }
+               }
+               logger.info("logFolder=" + logFolder + ", name=" + getName());
+
+               if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+                       logFileNameFormat = "%app-type%_ranger_audit.log";
+               }
+
+               logger.info("logFileNameFormat=" + logFileNameFormat + ", 
destName="
+                               + getName());
+
+               initDone = true;
+       }
+
+       @Override
+       public boolean logJSON(Collection<String> events) {
+               try {
+                       PrintWriter out = getLogFileStream();
+                       for (String event : events) {
+                               out.println(event);
+                       }
+                       out.flush();
+               } catch (Throwable t) {
+                       logError("Error writing to log file.", t);
+                       return false;
+               }
+               return true;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
+        */
+       @Override
+       synchronized public boolean log(Collection<AuditEventBase> events) {
+               if (isStopped) {
+                       logError("log() called after stop was requested. name=" 
+ getName());
+                       return false;
+               }
+               List<String> jsonList = new ArrayList<String>();
+               for (AuditEventBase event : events) {
+                       try {
+                               jsonList.add(MiscUtil.stringify(event));
+                       } catch (Throwable t) {
+                               logger.error("Error converting to JSON. event=" 
+ event);
+                       }
+               }
+               return logJSON(jsonList);
+
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#start()
+        */
+       @Override
+       public void start() {
+               // Nothing to do here. We will open the file when the first log 
request
+               // comes
+       }
+
+       @Override
+       synchronized public void stop() {
+               if (logWriter != null) {
+                       logWriter.flush();
+                       logWriter.close();
+                       logWriter = null;
+                       isStopped = true;
+               }
+       }
+
+       // Helper methods in this class
+       synchronized private PrintWriter getLogFileStream() throws Exception {
+               closeFileIfNeeded();
+
+               // Either there are no open log file or the previous one has 
been rolled
+               // over
+               if (logWriter == null) {
+                       Date currentTime = new Date();
+                       // Create a new file
+                       String fileName = 
MiscUtil.replaceTokens(logFileNameFormat,
+                                       currentTime.getTime());
+                       File outLogFile = new File(logFolder, fileName);
+                       if (outLogFile.exists()) {
+                               // Let's try to get the next available file
+                               int i = 0;
+                               while (true) {
+                                       i++;
+                                       int lastDot = fileName.lastIndexOf('.');
+                                       String baseName = fileName.substring(0, 
lastDot);
+                                       String extension = 
fileName.substring(lastDot);
+                                       String newFileName = baseName + "." + i 
+ extension;
+                                       File newLogFile = new File(logFolder, 
newFileName);
+                                       if (!newLogFile.exists()) {
+                                               // Move the file
+                                               if 
(!outLogFile.renameTo(newLogFile)) {
+                                                       logger.error("Error 
renameing file. " + outLogFile
+                                                                       + " to 
" + newLogFile);
+                                               }
+                                               break;
+                                       }
+                               }
+                       }
+                       if (!outLogFile.exists()) {
+                               logger.info("Creating new file. destName=" + 
getName()
+                                               + ", fileName=" + fileName);
+                               // Open the file
+                               logWriter = new PrintWriter(new 
BufferedWriter(new FileWriter(
+                                               outLogFile)));
+                       } else {
+                               logWriter = new PrintWriter(new 
BufferedWriter(new FileWriter(
+                                               outLogFile, true)));
+                       }
+                       fileCreateTime = new Date();
+                       currentFileName = outLogFile.getPath();
+               }
+               return logWriter;
+       }
+
+       private void closeFileIfNeeded() throws FileNotFoundException, 
IOException {
+               if (logWriter == null) {
+                       return;
+               }
+               if (System.currentTimeMillis() - fileCreateTime.getTime() > 
fileRolloverSec * 1000) {
+                       logger.info("Closing file. Rolling over. name=" + 
getName()
+                                       + ", fileName=" + currentFileName);
+                       logWriter.flush();
+                       logWriter.close();
+                       logWriter = null;
+                       currentFileName = null;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
new file mode 100644
index 0000000..706eb8e
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.destination;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+/**
+ * This class write the logs to local file
+ */
+public class HDFSAuditDestination extends AuditDestination {
+       private static final Log logger = LogFactory
+                       .getLog(HDFSAuditDestination.class);
+
+       public static final String PROP_HDFS_DIR = "dir";
+       public static final String PROP_HDFS_SUBDIR = "subdir";
+       public static final String PROP_HDFS_FILE_NAME_FORMAT = 
"filename.format";
+       public static final String PROP_HDFS_ROLLOVER = "file.rollover.sec";
+
+       String baseFolder = null;
+       String fileFormat = null;
+       int fileRolloverSec = 24 * 60 * 60; // In seconds
+       private String logFileNameFormat;
+
+       boolean initDone = false;
+
+       private String logFolder;
+       PrintWriter logWriter = null;
+
+       private Date fileCreateTime = null;
+
+       private String currentFileName;
+
+       private boolean isStopped = false;
+
+       @Override
+       public void init(Properties prop, String propPrefix) {
+               super.init(prop, propPrefix);
+
+               // Initialize properties for this class
+               // Initial folder and file properties
+               String logFolderProp = MiscUtil.getStringProperty(props, 
propPrefix
+                               + "." + PROP_HDFS_DIR);
+               String logSubFolder = MiscUtil.getStringProperty(props, 
propPrefix
+                               + "." + PROP_HDFS_SUBDIR);
+               if (logSubFolder == null || logSubFolder.isEmpty()) {
+                       logSubFolder = "%app-type%/%time:yyyyMMdd%";
+               }
+
+               logFileNameFormat = MiscUtil.getStringProperty(props, 
propPrefix + "."
+                               + PROP_HDFS_FILE_NAME_FORMAT);
+               fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + 
"."
+                               + PROP_HDFS_ROLLOVER, fileRolloverSec);
+
+               if (logFileNameFormat == null || logFileNameFormat.isEmpty()) {
+                       logFileNameFormat = 
"%app-type%_ranger_audit_%hostname%" + ".log";
+               }
+
+               if (logFolderProp == null || logFolderProp.isEmpty()) {
+                       logger.fatal("File destination folder is not 
configured. Please set "
+                                       + propPrefix + "." + PROP_HDFS_DIR + ". 
name=" + getName());
+                       return;
+               }
+
+               logFolder = logFolderProp + "/" + logSubFolder;
+               logger.info("logFolder=" + logFolder + ", destName=" + 
getName());
+               logger.info("logFileNameFormat=" + logFileNameFormat + ", 
destName="
+                               + getName());
+
+               initDone = true;
+       }
+
+       @Override
+       public boolean logJSON(Collection<String> events) {
+               try {
+                       PrintWriter out = getLogFileStream();
+                       for (String event : events) {
+                               out.println(event);
+                       }
+                       out.flush();
+               } catch (Throwable t) {
+                       logError("Error writing to log file.", t);
+                       return false;
+               }
+               return true;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection)
+        */
+       @Override
+       synchronized public boolean log(Collection<AuditEventBase> events) {
+               if (isStopped) {
+                       logError("log() called after stop was requested. name=" 
+ getName());
+                       return false;
+               }
+               List<String> jsonList = new ArrayList<String>();
+               for (AuditEventBase event : events) {
+                       try {
+                               jsonList.add(MiscUtil.stringify(event));
+                       } catch (Throwable t) {
+                               logger.error("Error converting to JSON. event=" 
+ event);
+                       }
+               }
+               return logJSON(jsonList);
+
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#start()
+        */
+       @Override
+       public void start() {
+               // Nothing to do here. We will open the file when the first log 
request
+               // comes
+       }
+
+       @Override
+       synchronized public void stop() {
+               try {
+                       if (logWriter != null) {
+                               logWriter.flush();
+                               logWriter.close();
+                               logWriter = null;
+                               isStopped = true;
+                       }
+               } catch (Throwable t) {
+                       logger.error("Error closing HDFS file.", t);
+               }
+       }
+
+       // Helper methods in this class
+       synchronized private PrintWriter getLogFileStream() throws Throwable {
+               closeFileIfNeeded();
+
+               // Either there are no open log file or the previous one has 
been rolled
+               // over
+               if (logWriter == null) {
+                       Date currentTime = new Date();
+                       // Create a new file
+                       String fileName = 
MiscUtil.replaceTokens(logFileNameFormat,
+                                       currentTime.getTime());
+                       String parentFolder = MiscUtil.replaceTokens(logFolder,
+                                       currentTime.getTime());
+                       Configuration conf = new Configuration();
+
+                       String fullPath = parentFolder
+                                       + org.apache.hadoop.fs.Path.SEPARATOR + 
fileName;
+                       String defaultPath = fullPath;
+                       URI uri = URI.create(fullPath);
+                       FileSystem fileSystem = FileSystem.get(uri, conf);
+
+                       Path hdfPath = new Path(fullPath);
+                       logger.info("Checking whether log file exists. 
hdfPath=" + fullPath);
+                       int i = 0;
+                       while (fileSystem.exists(hdfPath)) {
+                               i++;
+                               int lastDot = defaultPath.lastIndexOf('.');
+                               String baseName = defaultPath.substring(0, 
lastDot);
+                               String extension = 
defaultPath.substring(lastDot);
+                               fullPath = baseName + "." + i + extension;
+                               hdfPath = new Path(fullPath);
+                               logger.info("Checking whether log file exists. 
hdfPath=" + fullPath);
+                       }
+                       logger.info("Log file doesn't exists. Will create and 
use it. hdfPath=" + fullPath);
+                       // Create parent folders
+                       createParents(hdfPath, fileSystem);
+
+                       // Create the file to write
+                       logger.info("Creating new log file. hdfPath=" + 
fullPath);
+                       FSDataOutputStream ostream = fileSystem.create(hdfPath);
+                       logWriter = new PrintWriter(ostream);
+                       fileCreateTime = new Date();
+                       currentFileName = fullPath;
+               }
+               return logWriter;
+       }
+
+       private void createParents(Path pathLogfile, FileSystem fileSystem)
+                       throws Throwable {
+               logger.info("Creating parent folder for " + pathLogfile);
+               Path parentPath = pathLogfile != null ? pathLogfile.getParent() 
: null;
+
+               if (parentPath != null && fileSystem != null
+                               && !fileSystem.exists(parentPath)) {
+                       fileSystem.mkdirs(parentPath);
+               }
+       }
+
+       private void closeFileIfNeeded() throws FileNotFoundException, 
IOException {
+               if (logWriter == null) {
+                       return;
+               }
+               // TODO: Close the file on absolute time. Currently it is 
implemented as
+               // relative time
+               if (System.currentTimeMillis() - fileCreateTime.getTime() > 
fileRolloverSec * 1000) {
+                       logger.info("Closing file. Rolling over. name=" + 
getName()
+                                       + ", fileName=" + currentFileName);
+                       logWriter.flush();
+                       logWriter.close();
+                       logWriter = null;
+                       currentFileName = null;
+               }
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
index a44e047..39a2578 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java
@@ -19,6 +19,8 @@
 
 package org.apache.ranger.audit.model;
 
+import java.util.Date;
+
 import org.apache.ranger.audit.dao.DaoManager;
 
 public abstract class AuditEventBase {
@@ -27,7 +29,12 @@ public abstract class AuditEventBase {
        }
 
        public abstract void persist(DaoManager daoManager);
-
+       
+       public abstract String getEventKey();
+       public abstract Date getEventTime ();
+       public abstract void setEventCount(long frequencyCount);
+       public abstract void setEventDurationMS(long frequencyDurationMS);
+       
        protected String trim(String str, int len) {
                String ret = str;
                if (str != null) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
index af89f60..d648de3 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java
@@ -24,87 +24,87 @@ import java.util.Date;
 import org.apache.ranger.audit.dao.DaoManager;
 import org.apache.ranger.audit.entity.AuthzAuditEventDbObj;
 
-import com.google.gson.Gson;
 import com.google.gson.annotations.SerializedName;
 
-
 public class AuthzAuditEvent extends AuditEventBase {
        protected static String FIELD_SEPARATOR = ";";
 
-       protected static final int MAX_ACTION_FIELD_SIZE       = 1800 ;
-       protected static final int MAX_REQUEST_DATA_FIELD_SIZE = 1800 ;
+       protected static final int MAX_ACTION_FIELD_SIZE = 1800;
+       protected static final int MAX_REQUEST_DATA_FIELD_SIZE = 1800;
 
        @SerializedName("repoType")
-       protected int    repositoryType = 0;
+       protected int repositoryType = 0;
 
        @SerializedName("repo")
        protected String repositoryName = null;
 
        @SerializedName("reqUser")
-       protected String user           = null;
+       protected String user = null;
 
        @SerializedName("evtTime")
-       protected Date   eventTime      = new Date();
+       protected Date eventTime = new Date();
 
        @SerializedName("access")
-       protected String accessType     = null;
+       protected String accessType = null;
 
        @SerializedName("resource")
-       protected String resourcePath   = null;
+       protected String resourcePath = null;
 
        @SerializedName("resType")
-       protected String resourceType   = null;
+       protected String resourceType = null;
 
        @SerializedName("action")
-       protected String action         = null;
+       protected String action = null;
 
        @SerializedName("result")
-       protected short  accessResult   = 0; // 0 - DENIED; 1 - ALLOWED; HTTP 
return code
+       protected short accessResult = 0; // 0 - DENIED; 1 - ALLOWED; HTTP 
return
+                                                                               
// code
 
        @SerializedName("agent")
-       protected String agentId        = null;
+       protected String agentId = null;
 
        @SerializedName("policy")
-       protected long   policyId       = 0;
+       protected long policyId = 0;
 
        @SerializedName("reason")
-       protected String resultReason   = null;
+       protected String resultReason = null;
 
        @SerializedName("enforcer")
-       protected String aclEnforcer    = null;
+       protected String aclEnforcer = null;
 
        @SerializedName("sess")
-       protected String sessionId      = null;
+       protected String sessionId = null;
 
        @SerializedName("cliType")
-       protected String clientType     = null;
+       protected String clientType = null;
 
        @SerializedName("cliIP")
-       protected String clientIP       = null;
+       protected String clientIP = null;
 
        @SerializedName("reqData")
-       protected String requestData    = null;
+       protected String requestData = null;
 
        @SerializedName("agentHost")
-       protected String agentHostname  = null;
+       protected String agentHostname = null;
 
        @SerializedName("logType")
-       protected String logType        = null;
+       protected String logType = null;
 
        @SerializedName("id")
-       protected String eventId        = null;
+       protected String eventId = null;
 
        /**
-        * This to ensure order within a session. Order not guaranteed across 
processes and hosts 
+        * This to ensure order within a session. Order not guaranteed across
+        * processes and hosts
         */
        @SerializedName("seq_num")
        protected long seqNum = 0;
 
-       @SerializedName("freq_count")
-       protected long frequencyCount = 1;
+       @SerializedName("event_count")
+       protected long eventCount = 1;
 
-       @SerializedName("freq_dur_ms")
-       protected long frequencyDurationMS = 0;
+       @SerializedName("event_dur_ms")
+       protected long eventDurationMS = 0;
 
        public AuthzAuditEvent() {
                super();
@@ -112,40 +112,29 @@ public class AuthzAuditEvent extends AuditEventBase {
                this.repositoryType = 0;
        }
 
-       public AuthzAuditEvent(int    repositoryType,
-                                                  String repositoryName,
-                                                  String user,
-                                                  Date   eventTime,
-                                                  String accessType,
-                                                  String resourcePath,
-                                                  String resourceType,
-                                                  String action,
-                                                  short  accessResult,
-                                                  String agentId,
-                                                  long   policyId,
-                                                  String resultReason,
-                                                  String aclEnforcer,
-                                                  String sessionId,
-                                                  String clientType,
-                                                  String clientIP,
-                                                  String requestData) {
+       public AuthzAuditEvent(int repositoryType, String repositoryName,
+                       String user, Date eventTime, String accessType,
+                       String resourcePath, String resourceType, String action,
+                       short accessResult, String agentId, long policyId,
+                       String resultReason, String aclEnforcer, String 
sessionId,
+                       String clientType, String clientIP, String requestData) 
{
                this.repositoryType = repositoryType;
                this.repositoryName = repositoryName;
-               this.user           = user;
-               this.eventTime      = eventTime;
-               this.accessType     = accessType;
-               this.resourcePath   = resourcePath;
-               this.resourceType   = resourceType;
-               this.action         = action;
-               this.accessResult   = accessResult;
-               this.agentId        = agentId;
-               this.policyId       = policyId;
-               this.resultReason   = resultReason;
-               this.aclEnforcer    = aclEnforcer;
-               this.sessionId      = sessionId;
-               this.clientType     = clientType;
-               this.clientIP       = clientIP;
-               this.requestData    = requestData;
+               this.user = user;
+               this.eventTime = eventTime;
+               this.accessType = accessType;
+               this.resourcePath = resourcePath;
+               this.resourceType = resourceType;
+               this.action = action;
+               this.accessResult = accessResult;
+               this.agentId = agentId;
+               this.policyId = policyId;
+               this.resultReason = resultReason;
+               this.aclEnforcer = aclEnforcer;
+               this.sessionId = sessionId;
+               this.clientType = clientType;
+               this.clientIP = clientIP;
+               this.requestData = requestData;
        }
 
        /**
@@ -156,7 +145,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param repositoryType the repositoryType to set
+        * @param repositoryType
+        *            the repositoryType to set
         */
        public void setRepositoryType(int repositoryType) {
                this.repositoryType = repositoryType;
@@ -170,7 +160,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param repositoryName the repositoryName to set
+        * @param repositoryName
+        *            the repositoryName to set
         */
        public void setRepositoryName(String repositoryName) {
                this.repositoryName = repositoryName;
@@ -184,7 +175,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param user the user to set
+        * @param user
+        *            the user to set
         */
        public void setUser(String user) {
                this.user = user;
@@ -198,7 +190,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param timeStamp the timeStamp to set
+        * @param timeStamp
+        *            the timeStamp to set
         */
        public void setEventTime(Date eventTime) {
                this.eventTime = eventTime;
@@ -212,7 +205,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param accessType the accessType to set
+        * @param accessType
+        *            the accessType to set
         */
        public void setAccessType(String accessType) {
                this.accessType = accessType;
@@ -226,7 +220,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param resourcePath the resourcePath to set
+        * @param resourcePath
+        *            the resourcePath to set
         */
        public void setResourcePath(String resourcePath) {
                this.resourcePath = resourcePath;
@@ -240,7 +235,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param resourceType the resourceType to set
+        * @param resourceType
+        *            the resourceType to set
         */
        public void setResourceType(String resourceType) {
                this.resourceType = resourceType;
@@ -250,11 +246,12 @@ public class AuthzAuditEvent extends AuditEventBase {
         * @return the action
         */
        public String getAction() {
-               return trim(action, MAX_ACTION_FIELD_SIZE) ;
+               return trim(action, MAX_ACTION_FIELD_SIZE);
        }
 
        /**
-        * @param action the action to set
+        * @param action
+        *            the action to set
         */
        public void setAction(String action) {
                this.action = action;
@@ -268,7 +265,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param accessResult the accessResult to set
+        * @param accessResult
+        *            the accessResult to set
         */
        public void setAccessResult(short accessResult) {
                this.accessResult = accessResult;
@@ -282,7 +280,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param agentId the agentId to set
+        * @param agentId
+        *            the agentId to set
         */
        public void setAgentId(String agentId) {
                this.agentId = agentId;
@@ -296,7 +295,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param policyId the policyId to set
+        * @param policyId
+        *            the policyId to set
         */
        public void setPolicyId(long policyId) {
                this.policyId = policyId;
@@ -310,7 +310,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param resultReason the resultReason to set
+        * @param resultReason
+        *            the resultReason to set
         */
        public void setResultReason(String resultReason) {
                this.resultReason = resultReason;
@@ -324,7 +325,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param aclEnforcer the aclEnforcer to set
+        * @param aclEnforcer
+        *            the aclEnforcer to set
         */
        public void setAclEnforcer(String aclEnforcer) {
                this.aclEnforcer = aclEnforcer;
@@ -338,7 +340,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param sessionId the sessionId to set
+        * @param sessionId
+        *            the sessionId to set
         */
        public void setSessionId(String sessionId) {
                this.sessionId = sessionId;
@@ -352,7 +355,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param clientType the clientType to set
+        * @param clientType
+        *            the clientType to set
         */
        public void setClientType(String clientType) {
                this.clientType = clientType;
@@ -366,7 +370,8 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        /**
-        * @param clientIP the clientIP to set
+        * @param clientIP
+        *            the clientIP to set
         */
        public void setClientIP(String clientIP) {
                this.clientIP = clientIP;
@@ -376,11 +381,12 @@ public class AuthzAuditEvent extends AuditEventBase {
         * @return the requestData
         */
        public String getRequestData() {
-               return trim(requestData, MAX_REQUEST_DATA_FIELD_SIZE) ;
+               return trim(requestData, MAX_REQUEST_DATA_FIELD_SIZE);
        }
 
        /**
-        * @param requestData the requestData to set
+        * @param requestData
+        *            the requestData to set
         */
        public void setRequestData(String requestData) {
                this.requestData = requestData;
@@ -410,8 +416,6 @@ public class AuthzAuditEvent extends AuditEventBase {
                this.eventId = eventId;
        }
 
-
-       
        public long getSeqNum() {
                return seqNum;
        }
@@ -420,20 +424,28 @@ public class AuthzAuditEvent extends AuditEventBase {
                this.seqNum = seqNum;
        }
 
-       public long getFrequencyCount() {
-               return frequencyCount;
+       public long getEventCount() {
+               return eventCount;
        }
 
-       public void setFrequencyCount(long frequencyCount) {
-               this.frequencyCount = frequencyCount;
+       public void setEventCount(long frequencyCount) {
+               this.eventCount = frequencyCount;
        }
 
-       public long getFrequencyDurationMS() {
-               return frequencyDurationMS;
+       public long getEventDurationMS() {
+               return eventDurationMS;
        }
 
-       public void setFrequencyDurationMS(long frequencyDurationMS) {
-               this.frequencyDurationMS = frequencyDurationMS;
+       public void setEventDurationMS(long frequencyDurationMS) {
+               this.eventDurationMS = frequencyDurationMS;
+       }
+
+       @Override
+       public String getEventKey() {
+               String key = user + "^" + accessType + "^" + resourcePath + "^"
+                               + resourceType + "^" + action + "^" + 
accessResult + "^"
+                               + sessionId + "^" + clientIP;
+               return key;
        }
 
        @Override
@@ -448,35 +460,42 @@ public class AuthzAuditEvent extends AuditEventBase {
        }
 
        protected StringBuilder toString(StringBuilder sb) {
-               
sb.append("repositoryType=").append(repositoryType).append(FIELD_SEPARATOR)
-                 
.append("repositoryName=").append(repositoryName).append(FIELD_SEPARATOR)
-                 .append("user=").append(user).append(FIELD_SEPARATOR)
-                 
.append("eventTime=").append(eventTime).append(FIELD_SEPARATOR)
-                 
.append("accessType=").append(accessType).append(FIELD_SEPARATOR)
-                 
.append("resourcePath=").append(resourcePath).append(FIELD_SEPARATOR)
-                 
.append("resourceType=").append(resourceType).append(FIELD_SEPARATOR)
-                 .append("action=").append(action).append(FIELD_SEPARATOR)
-                 
.append("accessResult=").append(accessResult).append(FIELD_SEPARATOR)
-                 .append("agentId=").append(agentId).append(FIELD_SEPARATOR)
-                 .append("policyId=").append(policyId).append(FIELD_SEPARATOR)
-                 
.append("resultReason=").append(resultReason).append(FIELD_SEPARATOR)
-                 
.append("aclEnforcer=").append(aclEnforcer).append(FIELD_SEPARATOR)
-                 
.append("sessionId=").append(sessionId).append(FIELD_SEPARATOR)
-                 
.append("clientType=").append(clientType).append(FIELD_SEPARATOR)
-                 .append("clientIP=").append(clientIP).append(FIELD_SEPARATOR)
-                 
.append("requestData=").append(requestData).append(FIELD_SEPARATOR)
-                 
.append("agentHostname=").append(agentHostname).append(FIELD_SEPARATOR)
-                 .append("logType=").append(logType).append(FIELD_SEPARATOR)
-                 .append("eventId=").append(eventId).append(FIELD_SEPARATOR)
-                 .append("seq_num=").append(seqNum).append(FIELD_SEPARATOR)
-                 
.append("freq_count=").append(frequencyCount).append(FIELD_SEPARATOR)
-                 
.append("freq_dur_ms=").append(frequencyDurationMS).append(FIELD_SEPARATOR)
-               ;
+               sb.append("repositoryType=").append(repositoryType)
+                               
.append(FIELD_SEPARATOR).append("repositoryName=")
+                               
.append(repositoryName).append(FIELD_SEPARATOR).append("user=")
+                               
.append(user).append(FIELD_SEPARATOR).append("eventTime=")
+                               .append(eventTime).append(FIELD_SEPARATOR)
+                               .append("accessType=").append(accessType)
+                               .append(FIELD_SEPARATOR).append("resourcePath=")
+                               .append(resourcePath).append(FIELD_SEPARATOR)
+                               .append("resourceType=").append(resourceType)
+                               
.append(FIELD_SEPARATOR).append("action=").append(action)
+                               .append(FIELD_SEPARATOR).append("accessResult=")
+                               .append(accessResult).append(FIELD_SEPARATOR)
+                               
.append("agentId=").append(agentId).append(FIELD_SEPARATOR)
+                               
.append("policyId=").append(policyId).append(FIELD_SEPARATOR)
+                               .append("resultReason=").append(resultReason)
+                               .append(FIELD_SEPARATOR).append("aclEnforcer=")
+                               .append(aclEnforcer).append(FIELD_SEPARATOR)
+                               
.append("sessionId=").append(sessionId).append(FIELD_SEPARATOR)
+                               .append("clientType=").append(clientType)
+                               
.append(FIELD_SEPARATOR).append("clientIP=").append(clientIP)
+                               .append(FIELD_SEPARATOR).append("requestData=")
+                               .append(requestData).append(FIELD_SEPARATOR)
+                               .append("agentHostname=").append(agentHostname)
+                               
.append(FIELD_SEPARATOR).append("logType=").append(logType)
+                               
.append(FIELD_SEPARATOR).append("eventId=").append(eventId)
+                               
.append(FIELD_SEPARATOR).append("seq_num=").append(seqNum)
+                               .append(FIELD_SEPARATOR).append("event_count=")
+                               .append(eventCount).append(FIELD_SEPARATOR)
+                               .append("event_dur_ms=").append(eventDurationMS)
+                               .append(FIELD_SEPARATOR);
                return sb;
        }
 
        @Override
        public void persist(DaoManager daoManager) {
-               daoManager.getAuthzAuditEventDao().create(new 
AuthzAuditEventDbObj(this));
+               daoManager.getAuthzAuditEventDao().create(
+                               new AuthzAuditEventDbObj(this));
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
deleted file mode 100644
index 5553bcc..0000000
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java
+++ /dev/null
@@ -1,167 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.concurrent.LinkedTransferQueue;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-
-/**
- * This is a non-blocking queue with no limit on capacity.
- */
-public class AuditAsyncQueue extends BaseAuditProvider implements Runnable {
-       private static final Log logger = 
LogFactory.getLog(AuditAsyncQueue.class);
-
-       LinkedTransferQueue<AuditEventBase> queue = new 
LinkedTransferQueue<AuditEventBase>();
-       Thread consumerThread = null;
-
-       static int threadCount = 0;
-       static final String DEFAULT_NAME = "async";
-
-       public AuditAsyncQueue() {
-               setName(DEFAULT_NAME);
-       }
-
-       public AuditAsyncQueue(AuditProvider consumer) {
-               super(consumer);
-               setName(DEFAULT_NAME);
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
-        * audit.model.AuditEventBase)
-        */
-       @Override
-       public boolean log(AuditEventBase event) {
-               // Add to the queue and return ASAP
-               if (queue.size() >= getMaxQueueSize()) {
-                       return false;
-               }
-               queue.add(event);
-               addLifeTimeInLogCount(1);
-               return true;
-       }
-
-       @Override
-       public boolean log(Collection<AuditEventBase> events) {
-               for (AuditEventBase event : events) {
-                       log(event);
-               }
-               return true;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#start()
-        */
-       @Override
-       public void start() {
-               if(consumer != null) {
-                       consumer.start();
-               }
-               
-               consumerThread = new Thread(this, this.getClass().getName()
-                               + (threadCount++));
-               consumerThread.setDaemon(true);
-               consumerThread.start();
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#stop()
-        */
-       @Override
-       public void stop() {
-               setDrain(true);
-               try {
-                       consumerThread.interrupt();
-               } catch (Throwable t) {
-                       // ignore any exception
-               }
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-        */
-       @Override
-       public boolean isFlushPending() {
-               if (queue.isEmpty()) {
-                       return consumer.isFlushPending();
-               }
-               return true;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see java.lang.Runnable#run()
-        */
-       @Override
-       public void run() {
-               while (true) {
-                       try {
-                               AuditEventBase event = null;
-                               if (!isDrain()) {
-                                       // For Transfer queue take() is blocking
-                                       event = queue.take();
-                               } else {
-                                       // For Transfer queue poll() is non 
blocking
-                                       event = queue.poll();
-                               }
-                               if (event != null) {
-                                       Collection<AuditEventBase> eventList = 
new ArrayList<AuditEventBase>();
-                                       eventList.add(event);
-                                       // TODO: Put a limit. Hard coding to 
1000 (use batch size
-                                       // property)
-                                       queue.drainTo(eventList, 1000 - 1);
-                                       consumer.log(eventList);
-                                       eventList.clear();
-                               }
-                       } catch (InterruptedException e) {
-                               logger.info(
-                                               "Caught exception in consumer 
thread. Mostly to about loop",
-                                               e);
-                       } catch (Throwable t) {
-                               logger.error("Caught error during processing 
request.", t);
-                       }
-                       if (isDrain() && queue.isEmpty()) {
-                               break;
-                       }
-               }
-               try {
-                       // Call stop on the consumer
-                       consumer.stop();
-               } catch (Throwable t) {
-                       logger.error("Error while calling stop on consumer.", 
t);
-               }
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
deleted file mode 100644
index 58d122a..0000000
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java
+++ /dev/null
@@ -1,327 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Properties;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-
-public class AuditBatchProcessor extends BaseAuditProvider implements Runnable 
{
-       private static final Log logger = LogFactory
-                       .getLog(AuditBatchProcessor.class);
-
-       private BlockingQueue<AuditEventBase> queue = null;
-       private Collection<AuditEventBase> localBatchBuffer = new 
ArrayList<AuditEventBase>();
-
-       Thread consumerThread = null;
-       static int threadCount = 0;
-
-       public AuditBatchProcessor() {
-       }
-
-       public AuditBatchProcessor(AuditProvider consumer) {
-               super(consumer);
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger.
-        * audit.model.AuditEventBase)
-        */
-       @Override
-       public boolean log(AuditEventBase event) {
-               // Add to batchQueue. Block if full
-               queue.add(event);
-               addLifeTimeInLogCount(1);
-               return true;
-       }
-
-       @Override
-       public boolean log(Collection<AuditEventBase> events) {
-               for (AuditEventBase event : events) {
-                       log(event);
-               }
-               return true;
-       }
-
-       @Override
-       public void init(Properties prop, String basePropertyName) {
-               String propPrefix = "xasecure.audit.batch";
-               if (basePropertyName != null) {
-                       propPrefix = basePropertyName;
-               }
-
-               super.init(prop, propPrefix);
-
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#start()
-        */
-       @Override
-       synchronized public void start() {
-               if (consumerThread != null) {
-                       logger.error("Provider is already started. name=" + 
getName());
-                       return;
-               }
-               logger.info("Creating ArrayBlockingQueue with maxSize="
-                               + getMaxQueueSize());
-               queue = new 
ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize());
-
-               // Start the consumer first
-               consumer.start();
-
-               // Then the FileSpooler
-               if (fileSpoolerEnabled) {
-                       fileSpooler.start();
-               }
-
-               // Finally the queue listener
-               consumerThread = new Thread(this, this.getClass().getName()
-                               + (threadCount++));
-               consumerThread.setDaemon(true);
-               consumerThread.start();
-
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#stop()
-        */
-       @Override
-       public void stop() {
-               setDrain(true);
-               flush();
-               try {
-                       consumerThread.interrupt();
-               } catch (Throwable t) {
-                       // ignore any exception
-               }
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
-        */
-       @Override
-       public void waitToComplete() {
-               int defaultTimeOut = -1;
-               waitToComplete(defaultTimeOut);
-               consumer.waitToComplete(defaultTimeOut);
-       }
-
-       @Override
-       public void waitToComplete(long timeout) {
-               setDrain(true);
-               flush();
-               long sleepTime = 1000;
-               long startTime = System.currentTimeMillis();
-               int prevQueueSize = -1;
-               int staticLoopCount = 0;
-               while ((queue.size() > 0 || localBatchBuffer.size() > 0)) {
-                       if (prevQueueSize == queue.size()) {
-                               logger.error("Queue size is not changing. " + 
getName()
-                                               + ".size=" + queue.size());
-                               staticLoopCount++;
-                               if (staticLoopCount > 5) {
-                                       logger.error("Aborting writing to 
consumer. Some logs will be discarded."
-                                                       + getName() + ".size=" 
+ queue.size());
-                               }
-                       } else {
-                               staticLoopCount = 0;
-                       }
-                       consumerThread.interrupt();
-                       try {
-                               Thread.sleep(sleepTime);
-                               if (timeout > 0
-                                               && (System.currentTimeMillis() 
- startTime > timeout)) {
-                                       break;
-                               }
-                       } catch (InterruptedException e) {
-                               break;
-                       }
-               }
-               consumer.waitToComplete(timeout);
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-        */
-       @Override
-       public boolean isFlushPending() {
-               if (queue.isEmpty()) {
-                       return consumer.isFlushPending();
-               }
-               return true;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
-        */
-       @Override
-       public void flush() {
-               if (fileSpoolerEnabled) {
-                       fileSpooler.flush();
-               }
-               consumer.flush();
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see java.lang.Runnable#run()
-        */
-       @Override
-       public void run() {
-               long lastDispatchTime = System.currentTimeMillis();
-               boolean isDestActive = true;
-               while (true) {
-                       // Time to next dispatch
-                       long nextDispatchDuration = lastDispatchTime
-                                       - System.currentTimeMillis() + 
getMaxBatchInterval();
-
-                       boolean isToSpool = false;
-                       boolean fileSpoolDrain = false;
-                       try {
-                               if (fileSpoolerEnabled && 
fileSpooler.isPending()) {
-                                       int percentUsed = (getMaxQueueSize() - 
queue.size()) * 100
-                                                       / getMaxQueueSize();
-                                       long lastAttemptDelta = fileSpooler
-                                                       
.getLastAttemptTimeDelta();
-
-                                       fileSpoolDrain = lastAttemptDelta > 
fileSpoolMaxWaitTime;
-                                       // If we should even read from queue?
-                                       if (!isDrain() && !fileSpoolDrain
-                                                       && percentUsed < 
fileSpoolDrainThresholdPercent) {
-                                               // Since some files are still 
under progress and it is
-                                               // not in drain mode, lets wait 
and retry
-                                               if (nextDispatchDuration > 0) {
-                                                       
Thread.sleep(nextDispatchDuration);
-                                               }
-                                               continue;
-                                       }
-                                       isToSpool = true;
-                               }
-
-                               AuditEventBase event = null;
-
-                               if (!isToSpool && !isDrain() && !fileSpoolDrain
-                                               && nextDispatchDuration > 0) {
-                                       event = queue.poll(nextDispatchDuration,
-                                                       TimeUnit.MILLISECONDS);
-
-                               } else {
-                                       // For poll() is non blocking
-                                       event = queue.poll();
-                               }
-                               if (event != null) {
-                                       localBatchBuffer.add(event);
-                                       if (getMaxBatchSize() >= 
localBatchBuffer.size()) {
-                                               queue.drainTo(localBatchBuffer, 
getMaxBatchSize()
-                                                               - 
localBatchBuffer.size());
-                                       }
-                               }
-                       } catch (InterruptedException e) {
-                               logger.info(
-                                               "Caught exception in consumer 
thread. Mostly to abort loop",
-                                               e);
-                       } catch (Throwable t) {
-                               logger.error("Caught error during processing 
request.", t);
-                       }
-
-                       if (localBatchBuffer.size() > 0 && isToSpool) {
-                               // Let spool to the file directly
-                               if (isDestActive) {
-                                       logger.info("Switching to file spool. 
Queue=" + getName()
-                                                       + ", dest=" + 
consumer.getName());
-                               }
-                               isDestActive = false;
-                               fileSpooler.stashLogs(localBatchBuffer);
-                               localBatchBuffer.clear();
-                               // Reset all variables
-                               lastDispatchTime = System.currentTimeMillis();
-                       } else if (localBatchBuffer.size() > 0
-                                       && (isDrain()
-                                                       || 
localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
-                               if (fileSpoolerEnabled && !isDestActive) {
-                                       logger.info("Switching to writing to 
destination. Queue="
-                                                       + getName() + ", dest=" 
+ consumer.getName());
-                               }
-                               boolean ret = consumer.log(localBatchBuffer);
-                               if (!ret) {
-                                       if (fileSpoolerEnabled) {
-                                               logger.info("Switching to file 
spool. Queue="
-                                                               + getName() + 
", dest=" + consumer.getName());
-                                               // Transient error. Stash and 
move on
-                                               
fileSpooler.stashLogs(localBatchBuffer);
-                                               isDestActive = false;
-                                       } else {
-                                               // We need to drop this event
-                                               
logFailedEvent(localBatchBuffer, null);
-                                       }
-                               } else {
-                                       isDestActive = true;
-                               }
-                               localBatchBuffer.clear();
-                               // Reset all variables
-                               lastDispatchTime = System.currentTimeMillis();
-                       }
-
-                       if (isDrain()) {
-                               if (!queue.isEmpty() || localBatchBuffer.size() 
> 0) {
-                                       logger.info("Queue is not empty. Will 
retry. queue.size)="
-                                                       + queue.size() + ", 
localBatchBuffer.size()="
-                                                       + 
localBatchBuffer.size());
-                               } else {
-                                       break;
-                               }
-                       }
-               }
-
-               logger.info("Exiting consumerThread. Queue=" + getName() + ", 
dest="
-                               + consumer.getName());
-               try {
-                       // Call stop on the consumer
-                       consumer.stop();
-                       if (fileSpoolerEnabled) {
-                               fileSpooler.stop();
-                       }
-               } catch (Throwable t) {
-                       logger.error("Error while calling stop on consumer.", 
t);
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
deleted file mode 100644
index 11c32ca..0000000
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java
+++ /dev/null
@@ -1,70 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-/**
- * This class needs to be extended by anyone who wants to build custom
- * destination
- */
-public abstract class AuditDestination extends BaseAuditProvider {
-       private static final Log logger = 
LogFactory.getLog(AuditDestination.class);
-
-       public AuditDestination() {
-               logger.info("AuditDestination() enter");
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see
-        * 
org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties,
-        * java.lang.String)
-        */
-       @Override
-       public void init(Properties prop, String basePropertyName) {
-               super.init(prop, basePropertyName);
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-        */
-       @Override
-       public boolean isFlushPending() {
-               return false;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
-        */
-       @Override
-       public void flush() {
-
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
deleted file mode 100644
index 8b006de..0000000
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java
+++ /dev/null
@@ -1,875 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit.provider;
-
-import java.io.BufferedReader;
-import java.io.BufferedWriter;
-import java.io.File;
-import java.io.FileFilter;
-import java.io.FileNotFoundException;
-import java.io.FileReader;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.LinkedTransferQueue;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-
-import com.google.gson.Gson;
-import com.google.gson.GsonBuilder;
-
-/**
- * This class temporarily stores logs in file system if the destination is
- * overloaded or down
- */
-public class AuditFileSpool implements Runnable {
-       private static final Log logger = 
LogFactory.getLog(AuditFileSpool.class);
-
-       public enum SPOOL_FILE_STATUS {
-               pending, write_inprogress, read_inprogress, done
-       }
-
-       public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir";
-       public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = 
"filespool.filename.format";
-       public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = 
"filespool.archive.dir";
-       public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = 
"filespool.archive.max.files";
-       public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = 
"filespool.file.prefix";
-       public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = 
"filespool.file.rollover.sec";
-       public static final String PROP_FILE_SPOOL_INDEX_FILE = 
"filespool.index.filename";
-       // public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE =
-       // "filespool.index.done_filename";
-       public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = 
"filespool.destination.retry.ms";
-
-       AuditProvider queueProvider = null;
-       AuditProvider consumerProvider = null;
-
-       BlockingQueue<AuditIndexRecord> indexQueue = new 
LinkedTransferQueue<AuditIndexRecord>();
-
-       // Folder and File attributes
-       File logFolder = null;
-       String logFileNameFormat = null;
-       File archiveFolder = null;
-       String fileNamePrefix = null;
-       String indexFileName = null;
-       File indexFile = null;
-       String indexDoneFileName = null;
-       File indexDoneFile = null;
-       int retryDestinationMS = 30 * 1000; // Default 30 seconds
-       int fileRolloverSec = 24 * 60 * 60; // In seconds
-       int maxArchiveFiles = 100;
-
-       int errorLogIntervalMS = 30 * 1000; // Every 30 seconds
-       long lastErrorLogMS = 0;
-
-       List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>();
-
-       boolean isPending = false;
-       long lastAttemptTime = 0;
-       boolean initDone = false;
-
-       PrintWriter logWriter = null;
-       AuditIndexRecord currentWriterIndexRecord = null;
-       AuditIndexRecord currentConsumerIndexRecord = null;
-
-       BufferedReader logReader = null;
-
-       Thread destinationThread = null;
-
-       boolean isWriting = true;
-       boolean isDrain = false;
-       boolean isDestDown = true;
-
-       private static Gson gson = null;
-
-       public AuditFileSpool(AuditProvider queueProvider,
-                       AuditProvider consumerProvider) {
-               this.queueProvider = queueProvider;
-               this.consumerProvider = consumerProvider;
-       }
-
-       public void init(Properties prop) {
-               init(prop, null);
-       }
-
-       public void init(Properties props, String basePropertyName) {
-               if (initDone) {
-                       logger.error("init() called more than once. 
queueProvider="
-                                       + queueProvider.getName() + ", 
consumerProvider="
-                                       + consumerProvider.getName());
-                       return;
-               }
-               String propPrefix = "xasecure.audit.filespool";
-               if (basePropertyName != null) {
-                       propPrefix = basePropertyName;
-               }
-
-               try {
-                       gson = new GsonBuilder().setDateFormat("yyyy-MM-dd 
HH:mm:ss.SSS")
-                                       .create();
-
-                       // Initial folder and file properties
-                       String logFolderProp = 
MiscUtil.getStringProperty(props, propPrefix
-                                       + "." + PROP_FILE_SPOOL_LOCAL_DIR);
-                       logFileNameFormat = MiscUtil.getStringProperty(props,
-                                       basePropertyName + "." + 
PROP_FILE_SPOOL_LOCAL_FILE_NAME);
-                       String archiveFolderProp = 
MiscUtil.getStringProperty(props,
-                                       propPrefix + "." + 
PROP_FILE_SPOOL_ARCHIVE_DIR);
-                       fileNamePrefix = MiscUtil.getStringProperty(props, 
propPrefix + "."
-                                       + PROP_FILE_SPOOL_FILENAME_PREFIX);
-                       indexFileName = MiscUtil.getStringProperty(props, 
propPrefix + "."
-                                       + PROP_FILE_SPOOL_INDEX_FILE);
-                       retryDestinationMS = MiscUtil.getIntProperty(props, 
propPrefix
-                                       + "." + PROP_FILE_SPOOL_DEST_RETRY_MS, 
retryDestinationMS);
-                       fileRolloverSec = MiscUtil.getIntProperty(props, 
propPrefix + "."
-                                       + PROP_FILE_SPOOL_FILE_ROLLOVER, 
fileRolloverSec);
-                       maxArchiveFiles = MiscUtil.getIntProperty(props, 
propPrefix + "."
-                                       + 
PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles);
-
-                       logger.info("retryDestinationMS=" + retryDestinationMS
-                                       + ", queueName=" + 
queueProvider.getName());
-                       logger.info("fileRolloverSec=" + fileRolloverSec + ", 
queueName="
-                                       + queueProvider.getName());
-                       logger.info("maxArchiveFiles=" + maxArchiveFiles + ", 
queueName="
-                                       + queueProvider.getName());
-
-                       if (logFolderProp == null || logFolderProp.isEmpty()) {
-                               logger.error("Audit spool folder is not 
configured. Please set "
-                                               + propPrefix
-                                               + "."
-                                               + PROP_FILE_SPOOL_LOCAL_DIR
-                                               + ". queueName=" + 
queueProvider.getName());
-                               return;
-                       }
-                       logFolder = new File(logFolderProp);
-                       if (!logFolder.isDirectory()) {
-                               logFolder.mkdirs();
-                               if (!logFolder.isDirectory()) {
-                                       logger.error("File Spool folder not 
found and can't be created. folder="
-                                                       + 
logFolder.getAbsolutePath()
-                                                       + ", queueName="
-                                                       + 
queueProvider.getName());
-                                       return;
-                               }
-                       }
-                       logger.info("logFolder=" + logFolder + ", queueName="
-                                       + queueProvider.getName());
-
-                       if (logFileNameFormat == null || 
logFileNameFormat.isEmpty()) {
-                               logFileNameFormat = "spool_" + "%app-type%" + 
"_"
-                                               + "%time:yyyyMMdd-HHmm.ss%.log";
-                       }
-                       logger.info("logFileNameFormat=" + logFileNameFormat
-                                       + ", queueName=" + 
queueProvider.getName());
-
-                       if (archiveFolderProp == null || 
archiveFolderProp.isEmpty()) {
-                               archiveFolder = new File(logFolder, "archive");
-                       } else {
-                               archiveFolder = new File(archiveFolderProp);
-                       }
-                       if (!archiveFolder.isDirectory()) {
-                               archiveFolder.mkdirs();
-                               if (!archiveFolder.isDirectory()) {
-                                       logger.error("File Spool archive folder 
not found and can't be created. folder="
-                                                       + 
archiveFolder.getAbsolutePath()
-                                                       + ", queueName="
-                                                       + 
queueProvider.getName());
-                                       return;
-                               }
-                       }
-                       logger.info("archiveFolder=" + archiveFolder + ", 
queueName="
-                                       + queueProvider.getName());
-
-                       if (indexFileName == null || indexFileName.isEmpty()) {
-                               indexFileName = "index_" + fileNamePrefix + 
".json";
-                       }
-
-                       indexFile = new File(logFolder, indexFileName);
-                       if (!indexFile.exists()) {
-                               indexFile.createNewFile();
-                       }
-                       logger.info("indexFile=" + indexFile + ", queueName="
-                                       + queueProvider.getName());
-
-                       int lastDot = indexFileName.lastIndexOf('.');
-                       indexDoneFileName = indexFileName.substring(0, lastDot)
-                                       + "_closed.json";
-                       indexDoneFile = new File(logFolder, indexDoneFileName);
-                       if (!indexDoneFile.exists()) {
-                               indexDoneFile.createNewFile();
-                       }
-                       logger.info("indexDoneFile=" + indexDoneFile + ", 
queueName="
-                                       + queueProvider.getName());
-
-                       // Load index file
-                       loadIndexFile();
-                       for (AuditIndexRecord auditIndexRecord : indexRecords) {
-                               if 
(!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) {
-                                       isPending = true;
-                               }
-                               if (auditIndexRecord.status
-                                               
.equals(SPOOL_FILE_STATUS.write_inprogress)) {
-                                       currentWriterIndexRecord = 
auditIndexRecord;
-                                       logger.info("currentWriterIndexRecord="
-                                                       + 
currentWriterIndexRecord.filePath
-                                                       + ", queueName=" + 
queueProvider.getName());
-                               }
-                               if (auditIndexRecord.status
-                                               
.equals(SPOOL_FILE_STATUS.read_inprogress)) {
-                                       indexQueue.add(auditIndexRecord);
-                               }
-                       }
-                       printIndex();
-                       // One more loop to add the rest of the pending records 
in reverse
-                       // order
-                       for (int i = 0; i < indexRecords.size(); i++) {
-                               AuditIndexRecord auditIndexRecord = 
indexRecords.get(i);
-                               if 
(auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) {
-                                       File consumerFile = new 
File(auditIndexRecord.filePath);
-                                       if (!consumerFile.exists()) {
-                                               logger.error("INIT: Consumer 
file="
-                                                               + 
consumerFile.getPath() + " not found.");
-                                               System.exit(1);
-                                       }
-                                       indexQueue.add(auditIndexRecord);
-                               }
-                       }
-
-               } catch (Throwable t) {
-                       logger.fatal("Error initializing File Spooler. queue="
-                                       + queueProvider.getName(), t);
-                       return;
-               }
-               initDone = true;
-       }
-
-       /**
-        * Start looking for outstanding logs and update status according.
-        */
-       public void start() {
-               if (!initDone) {
-                       logger.error("Cannot start Audit File Spooler. 
Initilization not done yet. queueName="
-                                       + queueProvider.getName());
-                       return;
-               }
-
-               logger.info("Starting writerThread, queueName="
-                               + queueProvider.getName() + ", consumer="
-                               + consumerProvider.getName());
-
-               // Let's start the thread to read
-               destinationThread = new Thread(this, queueProvider.getName()
-                               + "_destWriter");
-               destinationThread.setDaemon(true);
-               destinationThread.start();
-       }
-
-       public void stop() {
-               if (!initDone) {
-                       logger.error("Cannot stop Audit File Spooler. 
Initilization not done. queueName="
-                                       + queueProvider.getName());
-                       return;
-               }
-               logger.info("Stop called, queueName=" + queueProvider.getName()
-                               + ", consumer=" + consumerProvider.getName());
-
-               isDrain = true;
-               flush();
-
-               PrintWriter out = getOpenLogFileStream();
-               if (out != null) {
-                       // If write is still going on, then let's give it 
enough time to
-                       // complete
-                       for (int i = 0; i < 3; i++) {
-                               if (isWriting) {
-                                       try {
-                                               Thread.sleep(1000);
-                                       } catch (InterruptedException e) {
-                                               // ignore
-                                       }
-                                       continue;
-                               }
-                               try {
-                                       logger.info("Closing open file, 
queueName="
-                                                       + 
queueProvider.getName() + ", consumer="
-                                                       + 
consumerProvider.getName());
-
-                                       out.flush();
-                                       out.close();
-                               } catch (Throwable t) {
-                                       logger.debug("Error closing spool out 
file.", t);
-                               }
-                       }
-               }
-               try {
-                       destinationThread.interrupt();
-               } catch (Throwable e) {
-                       // ignore
-               }
-       }
-
-       public void flush() {
-               if (!initDone) {
-                       logger.error("Cannot flush Audit File Spooler. 
Initilization not done. queueName="
-                                       + queueProvider.getName());
-                       return;
-               }
-               PrintWriter out = getOpenLogFileStream();
-               if (out != null) {
-                       out.flush();
-               }
-       }
-
-       /**
-        * If any files are still not processed. Also, if the destination is not
-        * reachable
-        * 
-        * @return
-        */
-       public boolean isPending() {
-               if (!initDone) {
-                       logError("isPending(): File Spooler not initialized. 
queueName="
-                                       + queueProvider.getName());
-                       return false;
-               }
-
-               return isPending;
-       }
-
-       /**
-        * Milliseconds from last attempt time
-        * 
-        * @return
-        */
-       public long getLastAttemptTimeDelta() {
-               if (lastAttemptTime == 0) {
-                       return 0;
-               }
-               return System.currentTimeMillis() - lastAttemptTime;
-       }
-
-       synchronized public void stashLogs(AuditEventBase event) {
-               if (isDrain) {
-                       // Stop has been called, so this method shouldn't be 
called
-                       logger.error("stashLogs() is called after stop is 
called. event="
-                                       + event);
-                       return;
-               }
-               try {
-                       isWriting = true;
-                       PrintWriter logOut = getLogFileStream();
-                       // Convert event to json
-                       String jsonStr = MiscUtil.stringify(event);
-                       logOut.println(jsonStr);
-                       isPending = true;
-               } catch (Exception ex) {
-                       logger.error("Error writing to file. event=" + event, 
ex);
-               } finally {
-                       isWriting = false;
-               }
-
-       }
-
-       synchronized public void stashLogs(Collection<AuditEventBase> events) {
-               for (AuditEventBase event : events) {
-                       stashLogs(event);
-               }
-               flush();
-       }
-
-       synchronized public void stashLogsString(String event) {
-               if (isDrain) {
-                       // Stop has been called, so this method shouldn't be 
called
-                       logger.error("stashLogs() is called after stop is 
called. event="
-                                       + event);
-                       return;
-               }
-               try {
-                       isWriting = true;
-                       PrintWriter logOut = getLogFileStream();
-                       logOut.println(event);
-               } catch (Exception ex) {
-                       logger.error("Error writing to file. event=" + event, 
ex);
-               } finally {
-                       isWriting = false;
-               }
-
-       }
-
-       synchronized public void stashLogsString(Collection<String> events) {
-               for (String event : events) {
-                       stashLogsString(event);
-               }
-               flush();
-       }
-
-       /**
-        * This return the current file. If there are not current open output 
file,
-        * then it will return null
-        * 
-        * @return
-        * @throws Exception
-        */
-       synchronized private PrintWriter getOpenLogFileStream() {
-               return logWriter;
-       }
-
-       /**
-        * @return
-        * @throws Exception
-        */
-       synchronized private PrintWriter getLogFileStream() throws Exception {
-               closeFileIfNeeded();
-
-               // Either there are no open log file or the previous one has 
been rolled
-               // over
-               if (currentWriterIndexRecord == null) {
-                       Date currentTime = new Date();
-                       // Create a new file
-                       String fileName = 
MiscUtil.replaceTokens(logFileNameFormat,
-                                       currentTime.getTime());
-                       String newFileName = fileName;
-                       File outLogFile = null;
-                       int i = 0;
-                       while (true) {
-                               outLogFile = new File(logFolder, newFileName);
-                               File archiveLogFile = new File(archiveFolder, 
newFileName);
-                               if (!outLogFile.exists() && 
!archiveLogFile.exists()) {
-                                       break;
-                               }
-                               i++;
-                               int lastDot = fileName.lastIndexOf('.');
-                               String baseName = fileName.substring(0, 
lastDot);
-                               String extension = fileName.substring(lastDot);
-                               newFileName = baseName + "." + i + extension;
-                       }
-                       fileName = newFileName;
-                       logger.info("Creating new file. queueName="
-                                       + queueProvider.getName() + ", 
fileName=" + fileName);
-                       // Open the file
-                       logWriter = new PrintWriter(new BufferedWriter(new 
FileWriter(
-                                       outLogFile)));
-
-                       AuditIndexRecord tmpIndexRecord = new 
AuditIndexRecord();
-
-                       tmpIndexRecord.id = MiscUtil.generateUniqueId();
-                       tmpIndexRecord.filePath = outLogFile.getPath();
-                       tmpIndexRecord.status = 
SPOOL_FILE_STATUS.write_inprogress;
-                       tmpIndexRecord.fileCreateTime = currentTime;
-                       tmpIndexRecord.lastAttempt = true;
-                       currentWriterIndexRecord = tmpIndexRecord;
-                       indexRecords.add(currentWriterIndexRecord);
-                       saveIndexFile();
-
-               } else {
-                       if (logWriter == null) {
-                               // This means the process just started. We need 
to open the file
-                               // in append mode.
-                               logger.info("Opening existing file for append. 
queueName="
-                                               + queueProvider.getName() + ", 
fileName="
-                                               + 
currentWriterIndexRecord.filePath);
-                               logWriter = new PrintWriter(new 
BufferedWriter(new FileWriter(
-                                               
currentWriterIndexRecord.filePath, true)));
-                       }
-               }
-               return logWriter;
-       }
-
-       synchronized private void closeFileIfNeeded() throws 
FileNotFoundException,
-                       IOException {
-               // Is there file open to write or there are no pending file, 
then close
-               // the active file
-               if (currentWriterIndexRecord != null) {
-                       // Check whether the file needs to rolled
-                       boolean closeFile = false;
-                       if (indexRecords.size() == 1) {
-                               closeFile = true;
-                               logger.info("Closing file. Only one open file. 
queueName="
-                                               + queueProvider.getName() + ", 
fileName="
-                                               + 
currentWriterIndexRecord.filePath);
-                       } else if (System.currentTimeMillis()
-                                       - 
currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) {
-                               closeFile = true;
-                               logger.info("Closing file. Rolling over. 
queueName="
-                                               + queueProvider.getName() + ", 
fileName="
-                                               + 
currentWriterIndexRecord.filePath);
-                       }
-                       if (closeFile) {
-                               // Roll the file
-                               if (logWriter != null) {
-                                       logWriter.flush();
-                                       logWriter.close();
-                                       logWriter = null;
-                               }
-                               currentWriterIndexRecord.status = 
SPOOL_FILE_STATUS.pending;
-                               currentWriterIndexRecord.writeCompleteTime = 
new Date();
-                               saveIndexFile();
-                               logger.info("Adding file to queue. queueName="
-                                               + queueProvider.getName() + ", 
fileName="
-                                               + 
currentWriterIndexRecord.filePath);
-                               indexQueue.add(currentWriterIndexRecord);
-                               currentWriterIndexRecord = null;
-                       }
-               }
-       }
-
-       /**
-        * Load the index file
-        * 
-        * @throws IOException
-        */
-       void loadIndexFile() throws IOException {
-               logger.info("Loading index file. fileName=" + 
indexFile.getPath());
-               BufferedReader br = new BufferedReader(new 
FileReader(indexFile));
-               indexRecords.clear();
-               String line;
-               while ((line = br.readLine()) != null) {
-                       if (!line.isEmpty() && !line.startsWith("#")) {
-                               AuditIndexRecord record = gson.fromJson(line,
-                                               AuditIndexRecord.class);
-                               indexRecords.add(record);
-                       }
-               }
-               br.close();
-       }
-
-       synchronized void printIndex() {
-               logger.info("INDEX printIndex() ==== START");
-               Iterator<AuditIndexRecord> iter = indexRecords.iterator();
-               while (iter.hasNext()) {
-                       AuditIndexRecord record = iter.next();
-                       logger.info("INDEX=" + record + ", isFileExist="
-                                       + (new File(record.filePath).exists()));
-               }
-               logger.info("INDEX printIndex() ==== END");
-       }
-
-       synchronized void removeIndexRecord(AuditIndexRecord indexRecord)
-                       throws FileNotFoundException, IOException {
-               Iterator<AuditIndexRecord> iter = indexRecords.iterator();
-               while (iter.hasNext()) {
-                       AuditIndexRecord record = iter.next();
-                       if (record.id.equals(indexRecord.id)) {
-                               logger.info("Removing file from index. file=" + 
record.filePath
-                                               + ", queueName=" + 
queueProvider.getName()
-                                               + ", consumer=" + 
consumerProvider.getName());
-
-                               iter.remove();
-                               appendToDoneFile(record);
-                       }
-               }
-               saveIndexFile();
-       }
-
-       synchronized void saveIndexFile() throws FileNotFoundException, 
IOException {
-               PrintWriter out = new PrintWriter(indexFile);
-               for (AuditIndexRecord auditIndexRecord : indexRecords) {
-                       out.println(gson.toJson(auditIndexRecord));
-               }
-               out.close();
-               // printIndex();
-
-       }
-
-       void appendToDoneFile(AuditIndexRecord indexRecord)
-                       throws FileNotFoundException, IOException {
-               logger.info("Moving to done file. " + indexRecord.filePath
-                               + ", queueName=" + queueProvider.getName() + ", 
consumer="
-                               + consumerProvider.getName());
-               String line = gson.toJson(indexRecord);
-               PrintWriter out = new PrintWriter(new BufferedWriter(new 
FileWriter(
-                               indexDoneFile, true)));
-               out.println(line);
-               out.flush();
-               out.close();
-
-               // Move to archive folder
-               File logFile = null;
-               File archiveFile = null;
-               try {
-                       logFile = new File(indexRecord.filePath);
-                       String fileName = logFile.getName();
-                       archiveFile = new File(archiveFolder, fileName);
-                       logger.info("Moving logFile " + logFile + " to " + 
archiveFile);
-                       logFile.renameTo(archiveFile);
-               } catch (Throwable t) {
-                       logger.error("Error moving log file to archive folder. 
logFile="
-                                       + logFile + ", archiveFile=" + 
archiveFile, t);
-               }
-
-               archiveFile = null;
-               try {
-                       // Remove old files
-                       File[] logFiles = archiveFolder.listFiles(new 
FileFilter() {
-                               public boolean accept(File pathname) {
-                                       return 
pathname.getName().toLowerCase().endsWith(".log");
-                               }
-                       });
-
-                       if (logFiles.length > maxArchiveFiles) {
-                               int filesToDelete = logFiles.length - 
maxArchiveFiles;
-                               BufferedReader br = new BufferedReader(new 
FileReader(
-                                               indexDoneFile));
-                               try {
-                                       int filesDeletedCount = 0;
-                                       while ((line = br.readLine()) != null) {
-                                               if (!line.isEmpty() && 
!line.startsWith("#")) {
-                                                       AuditIndexRecord record 
= gson.fromJson(line,
-                                                                       
AuditIndexRecord.class);
-                                                       logFile = new 
File(record.filePath);
-                                                       String fileName = 
logFile.getName();
-                                                       archiveFile = new 
File(archiveFolder, fileName);
-                                                       if 
(archiveFile.exists()) {
-                                                               
logger.info("Deleting archive file "
-                                                                               
+ archiveFile);
-                                                               boolean ret = 
archiveFile.delete();
-                                                               if (!ret) {
-                                                                       
logger.error("Error deleting archive file. archiveFile="
-                                                                               
        + archiveFile);
-                                                               }
-                                                               
filesDeletedCount++;
-                                                               if 
(filesDeletedCount >= filesToDelete) {
-                                                                       
logger.info("Deleted " + filesDeletedCount
-                                                                               
        + " files");
-                                                                       break;
-                                                               }
-                                                       }
-                                               }
-                                       }
-                               } finally {
-                                       br.close();
-                               }
-                       }
-               } catch (Throwable t) {
-                       logger.error("Error deleting older archive file. 
archiveFile="
-                                       + archiveFile, t);
-               }
-
-       }
-
-       void logError(String msg) {
-               long currTimeMS = System.currentTimeMillis();
-               if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) {
-                       logger.error(msg);
-                       lastErrorLogMS = currTimeMS;
-               }
-       }
-
-       class AuditIndexRecord {
-               String id;
-               String filePath;
-               int linePosition = 0;
-               SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress;
-               Date fileCreateTime;
-               Date writeCompleteTime;
-               Date doneCompleteTime;
-               Date lastSuccessTime;
-               Date lastFailedTime;
-               int failedAttemptCount = 0;
-               boolean lastAttempt = false;
-
-               @Override
-               public String toString() {
-                       return "AuditIndexRecord [id=" + id + ", filePath=" + 
filePath
-                                       + ", linePosition=" + linePosition + ", 
status=" + status
-                                       + ", fileCreateTime=" + fileCreateTime
-                                       + ", writeCompleteTime=" + 
writeCompleteTime
-                                       + ", doneCompleteTime=" + 
doneCompleteTime
-                                       + ", lastSuccessTime=" + lastSuccessTime
-                                       + ", lastFailedTime=" + lastFailedTime
-                                       + ", failedAttemptCount=" + 
failedAttemptCount
-                                       + ", lastAttempt=" + lastAttempt + "]";
-               }
-
-       }
-
-       class AuditFileSpoolAttempt {
-               Date attemptTime;
-               String status;
-       }
-
-       /*
-        * (non-Javadoc)
-        * 
-        * @see java.lang.Runnable#run()
-        */
-       @Override
-       public void run() {
-               while (true) {
-                       try {
-                               // Let's pause between each iteration
-                               if (currentConsumerIndexRecord == null) {
-                                       currentConsumerIndexRecord = 
indexQueue.poll(
-                                                       retryDestinationMS, 
TimeUnit.MILLISECONDS);
-                               } else {
-                                       Thread.sleep(retryDestinationMS);
-                               }
-
-                               if (isDrain) {
-                                       // Need to exit
-                                       break;
-                               }
-                               if (currentConsumerIndexRecord == null) {
-                                       closeFileIfNeeded();
-                                       continue;
-                               }
-
-                               boolean isRemoveIndex = false;
-                               File consumerFile = new File(
-                                               
currentConsumerIndexRecord.filePath);
-                               if (!consumerFile.exists()) {
-                                       logger.error("Consumer file=" + 
consumerFile.getPath()
-                                                       + " not found.");
-                                       printIndex();
-                                       isRemoveIndex = true;
-                               } else {
-                                       // Let's open the file to write
-                                       BufferedReader br = new 
BufferedReader(new FileReader(
-                                                       
currentConsumerIndexRecord.filePath));
-                                       try {
-                                               int startLine = 
currentConsumerIndexRecord.linePosition;
-                                               String line;
-                                               int currLine = 0;
-                                               boolean isResumed = false;
-                                               List<String> lines = new 
ArrayList<String>();
-                                               while ((line = br.readLine()) 
!= null) {
-                                                       currLine++;
-                                                       if (currLine < 
startLine) {
-                                                               continue;
-                                                       }
-                                                       lines.add(line);
-                                                       if (lines.size() == 
queueProvider.getMaxBatchSize()) {
-                                                               boolean ret = 
sendEvent(lines,
-                                                                               
currentConsumerIndexRecord, currLine);
-                                                               if (!ret) {
-                                                                       throw 
new Exception("Destination down");
-                                                               } else {
-                                                                       if 
(!isResumed) {
-                                                                               
logger.info("Started writing to destination. file="
-                                                                               
                + currentConsumerIndexRecord.filePath
-                                                                               
                + ", queueName="
-                                                                               
                + queueProvider.getName()
-                                                                               
                + ", consumer="
-                                                                               
                + consumerProvider.getName());
-                                                                       }
-                                                               }
-                                                               lines.clear();
-                                                       }
-                                               }
-                                               if (lines.size() > 0) {
-                                                       boolean ret = 
sendEvent(lines,
-                                                                       
currentConsumerIndexRecord, currLine);
-                                                       if (!ret) {
-                                                               throw new 
Exception("Destination down");
-                                                       } else {
-                                                               if (!isResumed) 
{
-                                                                       
logger.info("Started writing to destination. file="
-                                                                               
        + currentConsumerIndexRecord.filePath
-                                                                               
        + ", queueName="
-                                                                               
        + queueProvider.getName()
-                                                                               
        + ", consumer="
-                                                                               
        + consumerProvider.getName());
-                                                               }
-                                                       }
-                                                       lines.clear();
-                                               }
-                                               logger.info("Done reading file. 
file="
-                                                               + 
currentConsumerIndexRecord.filePath
-                                                               + ", 
queueName=" + queueProvider.getName()
-                                                               + ", consumer=" 
+ consumerProvider.getName());
-                                               // The entire file is read
-                                               
currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done;
-                                               
currentConsumerIndexRecord.doneCompleteTime = new Date();
-                                               
currentConsumerIndexRecord.lastAttempt = true;
-
-                                               isRemoveIndex = true;
-                                       } catch (Exception ex) {
-                                               isDestDown = true;
-                                               logError("Destination down. 
queueName="
-                                                               + 
queueProvider.getName() + ", consumer="
-                                                               + 
consumerProvider.getName());
-                                               lastAttemptTime = 
System.currentTimeMillis();
-                                               // Update the index file
-                                               
currentConsumerIndexRecord.lastFailedTime = new Date();
-                                               
currentConsumerIndexRecord.failedAttemptCount++;
-                                               
currentConsumerIndexRecord.lastAttempt = false;
-                                               saveIndexFile();
-                                       } finally {
-                                               br.close();
-                                       }
-                               }
-                               if (isRemoveIndex) {
-                                       // Remove this entry from index
-                                       
removeIndexRecord(currentConsumerIndexRecord);
-                                       currentConsumerIndexRecord = null;
-                                       closeFileIfNeeded();
-                               }
-                       } catch (Throwable t) {
-                               logger.error("Exception in destination writing 
thread.", t);
-                       }
-               }
-               logger.info("Exiting file spooler. provider=" + 
queueProvider.getName()
-                               + ", consumer=" + consumerProvider.getName());
-       }
-
-       private boolean sendEvent(List<String> lines, AuditIndexRecord 
indexRecord,
-                       int currLine) {
-               boolean ret = true;
-               try {
-                       ret = consumerProvider.logJSON(lines);
-                       if (!ret) {
-                               // Need to log error after fixed interval
-                               logError("Error sending logs to consumer. 
provider="
-                                               + queueProvider.getName() + ", 
consumer="
-                                               + consumerProvider.getName());
-                       } else {
-                               // Update index and save
-                               indexRecord.linePosition = currLine;
-                               indexRecord.status = 
SPOOL_FILE_STATUS.read_inprogress;
-                               indexRecord.lastSuccessTime = new Date();
-                               indexRecord.lastAttempt = true;
-                               saveIndexFile();
-
-                               if (isDestDown) {
-                                       isDestDown = false;
-                                       logger.info("Destination up now. " + 
indexRecord.filePath
-                                                       + ", queueName=" + 
queueProvider.getName()
-                                                       + ", consumer=" + 
consumerProvider.getName());
-                               }
-                       }
-               } catch (Throwable t) {
-                       logger.error("Error while sending logs to consumer. 
provider="
-                                       + queueProvider.getName() + ", 
consumer="
-                                       + consumerProvider.getName() + ", log=" 
+ lines, t);
-               }
-
-               return ret;
-       }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index 13b3142..a67f7e0 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -24,9 +24,14 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.FileAuditDestination;
+import org.apache.ranger.audit.destination.HDFSAuditDestination;
 import org.apache.ranger.audit.provider.hdfs.HdfsAuditProvider;
 import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider;
 import org.apache.ranger.audit.provider.solr.SolrAuditProvider;
+import org.apache.ranger.audit.queue.AuditAsyncQueue;
+import org.apache.ranger.audit.queue.AuditBatchQueue;
+import org.apache.ranger.audit.queue.AuditSummaryQueue;
 
 /*
  * TODO:
@@ -90,10 +95,11 @@ public class AuditProviderFactory {
                LOG.info("AuditProviderFactory: initializing..");
 
                if (mInitDone) {
-                       LOG.warn("AuditProviderFactory.init(): already 
initialized!",
+                       LOG.warn(
+                                       "AuditProviderFactory.init(): already 
initialized! Will try to re-initialize",
                                        new Exception());
 
-                       return;
+                       // return;
                }
                mInitDone = true;
 
@@ -125,7 +131,7 @@ public class AuditProviderFactory {
 
                for (Object propNameObj : props.keySet()) {
                        String propName = propNameObj.toString();
-                       if (propName.length() <= AUDIT_DEST_BASE.length() + 1) {
+                       if (!propName.startsWith(AUDIT_DEST_BASE)) {
                                continue;
                        }
                        String destName = 
propName.substring(AUDIT_DEST_BASE.length() + 1);
@@ -152,9 +158,14 @@ public class AuditProviderFactory {
 
                                String queueName = 
MiscUtil.getStringProperty(props,
                                                destPropPrefix + "." + 
BaseAuditProvider.PROP_QUEUE);
-                               if( queueName == null || queueName.isEmpty()) {
+                               if (queueName == null || queueName.isEmpty()) {
+                                       LOG.info(destPropPrefix + "."
+                                                       + 
BaseAuditProvider.PROP_QUEUE
+                                                       + " is not set. Setting 
queue to batch for "
+                                                       + destName);
                                        queueName = "batch";
                                }
+                               LOG.info("queue for " + destName + " is " + 
queueName);
                                if (queueName != null && !queueName.isEmpty()
                                                && 
!queueName.equalsIgnoreCase("none")) {
                                        String queuePropPrefix = destPropPrefix 
+ "." + queueName;
@@ -184,24 +195,55 @@ public class AuditProviderFactory {
                        }
                }
                if (providers.size() > 0) {
-                       LOG.info("Using v2 audit configuration");
+                       LOG.info("Using v3 audit configuration");
                        AuditAsyncQueue asyncQueue = new AuditAsyncQueue();
-                       String propPrefix = 
BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "async";
+                       String propPrefix = 
BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
+                                       + "async";
                        asyncQueue.init(props, propPrefix);
 
+                       propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX;
+                       boolean summaryEnabled = 
MiscUtil.getBooleanProperty(props,
+                                       propPrefix + "." + "summary" + "." + 
"enabled", false);
+                       AuditSummaryQueue summaryQueue = null;
+                       if (summaryEnabled) {
+                               LOG.info("AuditSummaryQueue is enabled");
+                               summaryQueue = new AuditSummaryQueue();
+                               summaryQueue.init(props, propPrefix);
+                               asyncQueue.setConsumer(summaryQueue);
+                       } else {
+                               LOG.info("AuditSummaryQueue is disabled");
+                       }
+
                        if (providers.size() == 1) {
-                               asyncQueue.setConsumer(providers.get(0));
+                               if (summaryEnabled) {
+                                       LOG.info("Setting " + 
providers.get(0).getName()
+                                                       + " as consumer to 
AuditSummaryQueue");
+                                       
summaryQueue.setConsumer(providers.get(0));
+                               } else {
+                                       LOG.info("Setting " + 
providers.get(0).getName()
+                                                       + " as consumer to " + 
asyncQueue.getName());
+                                       
asyncQueue.setConsumer(providers.get(0));
+                               }
                        } else {
                                MultiDestAuditProvider multiDestProvider = new 
MultiDestAuditProvider();
                                multiDestProvider.init(props);
                                multiDestProvider.addAuditProviders(providers);
-                               asyncQueue.setConsumer(multiDestProvider);
+                               if (summaryEnabled) {
+                                       LOG.info("Setting " + 
multiDestProvider.getName()
+                                                       + " as consumer to 
AuditSummaryQueue");
+                                       
summaryQueue.setConsumer(multiDestProvider);
+                               } else {
+                                       LOG.info("Setting " + 
multiDestProvider.getName()
+                                                       + " as consumer to " + 
asyncQueue.getName());
+                                       
asyncQueue.setConsumer(multiDestProvider);
+                               }
                        }
 
                        mProvider = asyncQueue;
+                       LOG.info("Starting " + mProvider.getName());
                        mProvider.start();
                } else {
-                       LOG.info("No v2 audit configuration found. Trying v1 
audit configurations");
+                       LOG.info("No v3 audit configuration found. Trying v2 
audit configurations");
                        if (!isEnabled
                                        || !(isAuditToDbEnabled || 
isAuditToHdfsEnabled
                                                        || 
isAuditToKafkaEnabled || isAuditToLog4jEnabled
@@ -356,7 +398,7 @@ public class AuditProviderFactory {
                                                .newInstance();
                        } catch (Exception e) {
                                LOG.fatal("Can't instantiate audit class for 
providerName="
-                                               + providerName + ", className=" 
+ className);
+                                               + providerName + ", className=" 
+ className, e);
                        }
                } else {
                        if (providerName.equals("file")) {
@@ -372,7 +414,7 @@ public class AuditProviderFactory {
                        } else if (providerName.equals("log4j")) {
                                provider = new Log4jAuditProvider();
                        } else if (providerName.equals("batch")) {
-                               provider = new AuditBatchProcessor();
+                               provider = new AuditBatchQueue();
                        } else if (providerName.equals("async")) {
                                provider = new AuditAsyncQueue();
                        } else {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
index 576176c..85c207b 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.queue.AuditFileSpool;
 
 import com.google.gson.GsonBuilder;
 

Reply via email to