Repository: incubator-ranger
Updated Branches:
  refs/heads/master 2f8bcd234 -> 4f3cea223


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java 
b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
deleted file mode 100644
index a023b9a..0000000
--- 
a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditProcessor.java
+++ /dev/null
@@ -1,786 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- * http://www.apache.org/licenses/LICENSE-2.0
- * 
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.ranger.audit;
-
-import static org.junit.Assert.*;
-
-import java.io.BufferedReader;
-import java.io.File;
-import java.io.FileReader;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Properties;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.ranger.audit.model.AuditEventBase;
-import org.apache.ranger.audit.model.AuthzAuditEvent;
-import org.apache.ranger.audit.provider.AuditAsyncQueue;
-import org.apache.ranger.audit.provider.AuditBatchProcessor;
-import org.apache.ranger.audit.provider.AuditDestination;
-import org.apache.ranger.audit.provider.AuditFileSpool;
-import org.apache.ranger.audit.provider.AuditProvider;
-import org.apache.ranger.audit.provider.AuditProviderFactory;
-import org.apache.ranger.audit.provider.BaseAuditProvider;
-import org.apache.ranger.audit.provider.FileAuditDestination;
-import org.apache.ranger.audit.provider.MiscUtil;
-import org.apache.ranger.audit.provider.MultiDestAuditProvider;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-public class TestAuditProcessor {
-
-       private static final Log logger = LogFactory
-                       .getLog(TestAuditProcessor.class);
-
-       @BeforeClass
-       public static void setUpBeforeClass() throws Exception {
-       }
-
-       @AfterClass
-       public static void tearDownAfterClass() throws Exception {
-       }
-
-       static private int seqNum = 0;
-
-       @Test
-       public void testAuditAsyncQueue() {
-               logger.debug("testAuditAsyncQueue()...");
-               TestConsumer testConsumer = new TestConsumer();
-               AuditAsyncQueue queue = new AuditAsyncQueue(testConsumer);
-               Properties props = new Properties();
-               queue.init(props);
-
-               queue.start();
-
-               int messageToSend = 10;
-               for (int i = 0; i < messageToSend; i++) {
-                       queue.log(createEvent());
-               }
-               queue.stop();
-               queue.waitToComplete();
-               // Let's wait for second
-               try {
-                       Thread.sleep(1000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-               assertEquals(messageToSend, testConsumer.getCountTotal());
-               assertEquals(messageToSend, testConsumer.getSumTotal());
-               assertNull("Event not in sequnce", testConsumer.isInSequence());
-       }
-
-       @Test
-       public void testMultipleQueue() {
-               logger.debug("testAuditAsyncQueue()...");
-               int destCount = 3;
-               TestConsumer[] testConsumer = new TestConsumer[destCount];
-
-               MultiDestAuditProvider multiQueue = new 
MultiDestAuditProvider();
-               for (int i = 0; i < destCount; i++) {
-                       testConsumer[i] = new TestConsumer();
-                       multiQueue.addAuditProvider(testConsumer[i]);
-               }
-
-               AuditAsyncQueue queue = new AuditAsyncQueue(multiQueue);
-               Properties props = new Properties();
-               queue.init(props);
-               queue.start();
-
-               int messageToSend = 10;
-               for (int i = 0; i < messageToSend; i++) {
-                       queue.log(createEvent());
-               }
-               queue.stop();
-               queue.waitToComplete();
-               // Let's wait for second
-               try {
-                       Thread.sleep(1000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-               for (int i = 0; i < destCount; i++) {
-                       assertEquals("consumer" + i, messageToSend,
-                                       testConsumer[i].getCountTotal());
-                       assertEquals("consumer" + i, messageToSend,
-                                       testConsumer[i].getSumTotal());
-
-               }
-       }
-
-       @Test
-       public void testAuditBatchProcessorBySize() {
-               logger.debug("testAuditBatchProcessor()...");
-               int messageToSend = 10;
-
-               String basePropName = "ranger.test.batch";
-               int batchSize = messageToSend / 3;
-               int expectedBatchSize = batchSize
-                               + (batchSize * 3 < messageToSend ? 1 : 0);
-               int queueSize = messageToSend * 2;
-               int intervalMS = messageToSend * 100; // Deliberately big 
interval
-               Properties props = new Properties();
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
-                               + batchSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
-                               + queueSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
-                               "" + intervalMS);
-
-               TestConsumer testConsumer = new TestConsumer();
-               AuditBatchProcessor queue = new 
AuditBatchProcessor(testConsumer);
-               queue.init(props, basePropName);
-               queue.start();
-
-               for (int i = 0; i < messageToSend; i++) {
-                       queue.log(createEvent());
-
-               }
-               // Let's wait for second
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-
-               queue.waitToComplete();
-               queue.stop();
-               queue.waitToComplete();
-
-               assertEquals("Total count", messageToSend, 
testConsumer.getCountTotal());
-               assertEquals("Total sum", messageToSend, 
testConsumer.getSumTotal());
-               assertEquals("Total batch", expectedBatchSize,
-                               testConsumer.getBatchCount());
-               assertNull("Event not in sequnce", testConsumer.isInSequence());
-
-       }
-
-       @Test
-       public void testAuditBatchProcessorByTime() {
-               logger.debug("testAuditBatchProcessor()...");
-
-               int messageToSend = 10;
-
-               String basePropName = "ranger.test.batch";
-               int batchSize = messageToSend * 2; // Deliberately big size
-               int queueSize = messageToSend * 2;
-               int intervalMS = (1000 / messageToSend) * 3; // e.g (1000/10 * 
3) = 300
-                                                                               
                                // ms
-               int pauseMS = 1000 / messageToSend + 3; // e.g. 1000/10 -5 = 
95ms
-               int expectedBatchSize = (messageToSend * pauseMS) / intervalMS 
+ 1;
-
-               Properties props = new Properties();
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
-                               + batchSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
-                               + queueSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
-                               "" + intervalMS);
-
-               TestConsumer testConsumer = new TestConsumer();
-               AuditBatchProcessor queue = new 
AuditBatchProcessor(testConsumer);
-               queue.init(props, basePropName);
-               queue.start();
-
-               for (int i = 0; i < messageToSend; i++) {
-                       queue.log(createEvent());
-                       try {
-                               Thread.sleep(pauseMS);
-                       } catch (InterruptedException e) {
-                               // ignore
-                       }
-               }
-               // Let's wait for second
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-               queue.waitToComplete();
-               queue.stop();
-               queue.waitToComplete();
-
-               assertEquals("Total count", messageToSend, 
testConsumer.getCountTotal());
-               assertEquals("Total sum", messageToSend, 
testConsumer.getSumTotal());
-               assertEquals("Total batch", expectedBatchSize,
-                               testConsumer.getBatchCount());
-               assertNull("Event not in sequnce", testConsumer.isInSequence());
-       }
-
-       @Test
-       public void testAuditBatchProcessorDestDown() {
-               logger.debug("testAuditBatchProcessorDestDown()...");
-               int messageToSend = 10;
-
-               String basePropName = "ranger.test.batch";
-               int batchSize = messageToSend / 3;
-               int queueSize = messageToSend * 2;
-               int intervalMS = Integer.MAX_VALUE; // Deliberately big interval
-               Properties props = new Properties();
-               props.put(basePropName + "." + BaseAuditProvider.PROP_NAME,
-                               "testAuditBatchProcessorDestDown");
-
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
-                               + batchSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
-                               + queueSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
-                               "" + intervalMS);
-
-               // Enable File Spooling
-               props.put(basePropName + "." + "filespool.enable", "" + true);
-               props.put(basePropName + "." + "filespool.dir", "target");
-
-               TestConsumer testConsumer = new TestConsumer();
-               testConsumer.isDown = true;
-
-               AuditBatchProcessor queue = new 
AuditBatchProcessor(testConsumer);
-               queue.init(props, basePropName);
-               queue.start();
-
-               for (int i = 0; i < messageToSend; i++) {
-                       queue.log(createEvent());
-
-               }
-               // Let's wait for second
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-
-               queue.waitToComplete(5000);
-               queue.stop();
-               queue.waitToComplete();
-
-               assertEquals("Total count", 0, testConsumer.getCountTotal());
-               assertEquals("Total sum", 0, testConsumer.getSumTotal());
-               assertEquals("Total batch", 0, testConsumer.getBatchCount());
-               assertNull("Event not in sequnce", testConsumer.isInSequence());
-       }
-
-       //@Test
-       public void testAuditBatchProcessorDestDownFlipFlop() {
-               logger.debug("testAuditBatchProcessorDestDown()...");
-               int messageToSend = 10;
-
-               String basePropName = "ranger.test.batch";
-               int batchSize = messageToSend / 3;
-               int expectedBatchSize = batchSize
-                               + (batchSize * 3 < messageToSend ? 1 : 0);
-               int queueSize = messageToSend * 2;
-               int intervalMS = 3000; // Deliberately big interval
-               Properties props = new Properties();
-               props.put(
-                               basePropName + "." + 
BaseAuditProvider.PROP_NAME,
-                               "testAuditBatchProcessorDestDownFlipFlop_"
-                                               + MiscUtil.generateUniqueId());
-
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
-                               + batchSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
-                               + queueSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
-                               "" + intervalMS);
-
-               // Enable File Spooling
-               int destRetryMS = 10;
-               props.put(
-                               basePropName + "." + 
BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
-                               "" + true);
-               props.put(
-                               basePropName + "." + 
AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
-                               "target");
-               props.put(basePropName + "."
-                               + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, 
""
-                               + destRetryMS);
-
-               TestConsumer testConsumer = new TestConsumer();
-               testConsumer.isDown = false;
-
-               AuditBatchProcessor queue = new 
AuditBatchProcessor(testConsumer);
-               queue.init(props, basePropName);
-               queue.start();
-
-               try {
-                       queue.log(createEvent());
-                       queue.log(createEvent());
-                       queue.log(createEvent());
-                       Thread.sleep(1000);
-                       testConsumer.isDown = true;
-                       Thread.sleep(1000);
-                       queue.log(createEvent());
-                       queue.log(createEvent());
-                       queue.log(createEvent());
-                       Thread.sleep(1000);
-                       testConsumer.isDown = false;
-                       Thread.sleep(1000);
-                       queue.log(createEvent());
-                       queue.log(createEvent());
-                       queue.log(createEvent());
-                       Thread.sleep(1000);
-                       testConsumer.isDown = true;
-                       Thread.sleep(1000);
-                       queue.log(createEvent());
-                       Thread.sleep(1000);
-                       testConsumer.isDown = false;
-                       Thread.sleep(1000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-               // Let's wait for second
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-
-               queue.waitToComplete(5000);
-               queue.stop();
-               queue.waitToComplete();
-
-               assertEquals("Total count", messageToSend, 
testConsumer.getCountTotal());
-               assertEquals("Total sum", messageToSend, 
testConsumer.getSumTotal());
-               assertNull("Event not in sequnce", testConsumer.isInSequence());
-
-       }
-
-       /**
-        * See if we recover after restart
-        */
-       public void testAuditBatchProcessorDestDownRestart() {
-               logger.debug("testAuditBatchProcessorDestDownRestart()...");
-               int messageToSend = 10;
-
-               String basePropName = "ranger.test.batch";
-               int batchSize = messageToSend / 3;
-               int queueSize = messageToSend * 2;
-               int intervalMS = 3000; // Deliberately big interval
-               int maxArchivedFiles = 1;
-               Properties props = new Properties();
-               props.put(
-                               basePropName + "." + 
BaseAuditProvider.PROP_NAME,
-                               "testAuditBatchProcessorDestDownRestart_"
-                                               + MiscUtil.generateUniqueId());
-
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
-                               + batchSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
-                               + queueSize);
-               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
-                               "" + intervalMS);
-
-               // Enable File Spooling
-               int destRetryMS = 10;
-               props.put(
-                               basePropName + "." + 
BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
-                               "" + true);
-               props.put(
-                               basePropName + "." + 
AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
-                               "target");
-               props.put(basePropName + "."
-                               + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, 
""
-                               + destRetryMS);
-               props.put(basePropName + "."
-                               + 
AuditFileSpool.PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, ""
-                               + maxArchivedFiles);
-
-               TestConsumer testConsumer = new TestConsumer();
-               testConsumer.isDown = true;
-
-               AuditBatchProcessor queue = new 
AuditBatchProcessor(testConsumer);
-               queue.init(props, basePropName);
-               queue.start();
-
-               for (int i = 0; i < messageToSend; i++) {
-                       queue.log(createEvent());
-
-               }
-               // Let's wait for second or two
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-
-               queue.waitToComplete(5000);
-               queue.stop();
-               queue.waitToComplete();
-
-               testConsumer.isDown = true;
-
-               // Let's wait for second or two
-               try {
-                       Thread.sleep(5000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-               
-               
-               // Let's now recreate the objects
-               testConsumer = new TestConsumer();
-
-               queue = new AuditBatchProcessor(testConsumer);
-               queue.init(props, basePropName);
-               queue.start();
-
-               // Let's wait for second
-               try {
-                       Thread.sleep(2000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-
-               queue.waitToComplete(5000);
-               queue.stop();
-               queue.waitToComplete();
-
-               assertEquals("Total count", messageToSend, 
testConsumer.getCountTotal());
-               assertEquals("Total sum", messageToSend, 
testConsumer.getSumTotal());
-               assertNull("Event not in sequnce", testConsumer.isInSequence());
-
-       }
-
-       @Test
-       public void testFileDestination() {
-               logger.debug("testFileDestination()...");
-
-               int messageToSend = 10;
-               int batchSize = messageToSend / 3;
-               int queueSize = messageToSend * 2;
-               int intervalMS = 500; // Should be less than final sleep time
-
-               String logFolderName = "target/testFileDestination";
-               File logFolder = new File(logFolderName);
-               String logFileName = "test_ranger_audit.log";
-               File logFile = new File(logFolder, logFileName);
-
-               Properties props = new Properties();
-               // Destination
-               String filePropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + 
".file";
-               props.put(filePropPrefix, "enable");
-               props.put(filePropPrefix + "." + BaseAuditProvider.PROP_NAME, 
"file");
-               props.put(filePropPrefix + "."
-                               + FileAuditDestination.PROP_FILE_LOCAL_DIR, 
logFolderName);
-               props.put(filePropPrefix + "."
-                               + 
FileAuditDestination.PROP_FILE_LOCAL_FILE_NAME_FORMAT,
-                               "%app-type%_ranger_audit.log");
-               props.put(filePropPrefix + "."
-                               + FileAuditDestination.PROP_FILE_FILE_ROLLOVER, 
"" + 10);
-
-               props.put(filePropPrefix + "." + BaseAuditProvider.PROP_QUEUE, 
"batch");
-               String batchPropPrefix = filePropPrefix + "." + "batch";
-
-               props.put(batchPropPrefix + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
-                               + batchSize);
-               props.put(batchPropPrefix + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
-                               + queueSize);
-               props.put(
-                               batchPropPrefix + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
-                               "" + intervalMS);
-
-               // Enable File Spooling
-               int destRetryMS = 10;
-               props.put(batchPropPrefix + "."
-                               + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, "" 
+ true);
-               props.put(batchPropPrefix + "."
-                               + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, 
"target");
-               props.put(batchPropPrefix + "."
-                               + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, 
""
-                               + destRetryMS);
-
-               AuditProviderFactory factory = 
AuditProviderFactory.getInstance();
-               factory.init(props, "test");
-
-               // FileAuditDestination fileDest = new FileAuditDestination();
-               // fileDest.init(props, filePropPrefix);
-               //
-               // AuditBatchProcessor queue = new 
AuditBatchProcessor(fileDest);
-               // queue.init(props, batchPropPrefix);
-               // queue.start();
-
-               AuditProvider queue = factory.getProvider();
-
-               for (int i = 0; i < messageToSend; i++) {
-                       queue.log(createEvent());
-               }
-               // Let's wait for second
-               try {
-                       Thread.sleep(1000);
-               } catch (InterruptedException e) {
-                       // ignore
-               }
-
-               queue.waitToComplete();
-               queue.stop();
-               queue.waitToComplete();
-
-               assertTrue("File created", logFile.exists());
-               try {
-                       List<AuthzAuditEvent> eventList = new 
ArrayList<AuthzAuditEvent>();
-                       int totalSum = 0;
-                       BufferedReader br = new BufferedReader(new 
FileReader(logFile));
-                       String line;
-                       int lastSeq = -1;
-                       boolean outOfSeq = false;
-                       while ((line = br.readLine()) != null) {
-                               AuthzAuditEvent event = MiscUtil.fromJson(line,
-                                               AuthzAuditEvent.class);
-                               eventList.add(event);
-                               totalSum += event.getFrequencyCount();
-                               if (event.getSeqNum() <= lastSeq) {
-                                       outOfSeq = true;
-                               }
-                       }
-                       br.close();
-                       assertEquals("Total count", messageToSend, 
eventList.size());
-                       assertEquals("Total sum", messageToSend, totalSum);
-                       assertFalse("Event not in sequnce", outOfSeq);
-
-               } catch (Throwable e) {
-                       logger.error("Error opening file for reading.", e);
-                       assertTrue("Error reading file. fileName=" + logFile + 
", error="
-                                       + e.toString(), true);
-               }
-
-       }
-
-       private AuthzAuditEvent createEvent() {
-               AuthzAuditEvent event = new AuthzAuditEvent();
-               event.setSeqNum(++seqNum);
-               return event;
-       }
-
-       class TestConsumer extends AuditDestination {
-
-               int countTotal = 0;
-               int sumTotal = 0;
-               int batchCount = 0;
-               String providerName = getClass().getName();
-               boolean isDown = false;
-               int batchSize = 3;
-
-               List<AuthzAuditEvent> eventList = new 
ArrayList<AuthzAuditEvent>();
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see
-                * 
org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger
-                * .audit.model.AuditEventBase)
-                */
-               @Override
-               public boolean log(AuditEventBase event) {
-                       if (isDown) {
-                               return false;
-                       }
-                       countTotal++;
-                       if (event instanceof AuthzAuditEvent) {
-                               AuthzAuditEvent azEvent = (AuthzAuditEvent) 
event;
-                               sumTotal += azEvent.getFrequencyCount();
-                               logger.info("EVENT:" + event);
-                               eventList.add(azEvent);
-                       }
-                       return true;
-               }
-
-               @Override
-               public boolean log(Collection<AuditEventBase> events) {
-                       if (isDown) {
-                               return false;
-                       }
-                       batchCount++;
-                       for (AuditEventBase event : events) {
-                               log(event);
-                       }
-                       return true;
-               }
-
-               @Override
-               public boolean logJSON(String jsonStr) {
-                       if (isDown) {
-                               return false;
-                       }
-                       countTotal++;
-                       AuthzAuditEvent event = MiscUtil.fromJson(jsonStr,
-                                       AuthzAuditEvent.class);
-                       sumTotal += event.getFrequencyCount();
-                       logger.info("JSON:" + jsonStr);
-                       eventList.add(event);
-                       return true;
-               }
-
-               @Override
-               public boolean logJSON(Collection<String> events) {
-                       if (isDown) {
-                               return false;
-                       }
-                       for (String event : events) {
-                               logJSON(event);
-                       }
-                       return true;
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see
-                * 
org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties
-                * )
-                */
-               @Override
-               public void init(Properties prop) {
-                       // Nothing to do here
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see org.apache.ranger.audit.provider.AuditProvider#start()
-                */
-               @Override
-               public void start() {
-                       // Nothing to do here
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see org.apache.ranger.audit.provider.AuditProvider#stop()
-                */
-               @Override
-               public void stop() {
-                       // Nothing to do here
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see 
org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
-                */
-               @Override
-               public void waitToComplete() {
-               }
-
-               @Override
-               public int getMaxBatchSize() {
-                       return batchSize;
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see 
org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
-                */
-               @Override
-               public boolean isFlushPending() {
-                       return false;
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see
-                * 
org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
-                */
-               @Override
-               public long getLastFlushTime() {
-                       return 0;
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see org.apache.ranger.audit.provider.AuditProvider#flush()
-                */
-               @Override
-               public void flush() {
-                       // Nothing to do here
-               }
-
-               public int getCountTotal() {
-                       return countTotal;
-               }
-
-               public int getSumTotal() {
-                       return sumTotal;
-               }
-
-               public int getBatchCount() {
-                       return batchCount;
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see
-                * 
org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties
-                * , java.lang.String)
-                */
-               @Override
-               public void init(Properties prop, String basePropertyName) {
-
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see
-                * 
org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
-                */
-               @Override
-               public void waitToComplete(long timeout) {
-
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see org.apache.ranger.audit.provider.AuditProvider#getName()
-                */
-               @Override
-               public String getName() {
-                       return providerName;
-               }
-
-               /*
-                * (non-Javadoc)
-                * 
-                * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
-                */
-               @Override
-               public boolean isDrain() {
-                       return false;
-               }
-
-               // Local methods
-               public AuthzAuditEvent isInSequence() {
-                       int lastSeq = -1;
-                       for (AuthzAuditEvent event : eventList) {
-                               if (event.getSeqNum() <= lastSeq) {
-                                       return event;
-                               }
-                       }
-                       return null;
-               }
-       }
-}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java 
b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
new file mode 100644
index 0000000..45477e2
--- /dev/null
+++ b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java
@@ -0,0 +1,704 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit;
+
+import static org.junit.Assert.*;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileReader;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.FileAuditDestination;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.AuditProvider;
+import org.apache.ranger.audit.provider.AuditProviderFactory;
+import org.apache.ranger.audit.provider.BaseAuditProvider;
+import org.apache.ranger.audit.provider.MiscUtil;
+import org.apache.ranger.audit.provider.MultiDestAuditProvider;
+import org.apache.ranger.audit.queue.AuditAsyncQueue;
+import org.apache.ranger.audit.queue.AuditBatchQueue;
+import org.apache.ranger.audit.queue.AuditFileSpool;
+import org.apache.ranger.audit.queue.AuditSummaryQueue;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+public class TestAuditQueue {
+
+       private static final Log logger = 
LogFactory.getLog(TestAuditQueue.class);
+
+       @BeforeClass
+       public static void setUpBeforeClass() throws Exception {
+       }
+
+       @AfterClass
+       public static void tearDownAfterClass() throws Exception {
+       }
+
+       static private int seqNum = 0;
+
+       @Test
+       public void testAuditAsyncQueue() {
+               logger.debug("testAuditAsyncQueue()...");
+               TestConsumer testConsumer = new TestConsumer();
+               AuditAsyncQueue queue = new AuditAsyncQueue(testConsumer);
+               Properties props = new Properties();
+               queue.init(props);
+
+               queue.start();
+
+               int messageToSend = 10;
+               for (int i = 0; i < messageToSend; i++) {
+                       queue.log(createEvent());
+               }
+               queue.stop();
+               queue.waitToComplete();
+               // Let's wait for second
+               try {
+                       Thread.sleep(1000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+               assertEquals(messageToSend, testConsumer.getCountTotal());
+               assertEquals(messageToSend, testConsumer.getSumTotal());
+               assertNull("Event not in sequnce", testConsumer.isInSequence());
+       }
+
+       @Test
+       public void testAuditSummaryQueue() {
+               logger.debug("testAuditSummaryQueue()...");
+               TestConsumer testConsumer = new TestConsumer();
+               AuditSummaryQueue queue = new AuditSummaryQueue(testConsumer);
+
+               Properties props = new Properties();
+               props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
+                               + AuditSummaryQueue.PROP_SUMMARY_INTERVAL, "" + 
300);
+               queue.init(props, BaseAuditProvider.PROP_DEFAULT_PREFIX);
+
+               queue.start();
+
+               commonTestSummary(testConsumer, queue);
+       }
+
+       private void commonTestSummary(TestConsumer testConsumer,
+                       BaseAuditProvider queue) {
+               int messageToSend = 0;
+               int pauseMS = 330;
+
+               int countToCheck = 0;
+               try {
+
+                       queue.log(createEvent("john", "select",
+                                       "xademo/customer_details/imei", true));
+                       messageToSend++;
+                       queue.log(createEvent("john", "select",
+                                       "xademo/customer_details/imei", true));
+                       messageToSend++;
+                       countToCheck++;
+                       queue.log(createEvent("jane", "select",
+                                       "xademo/customer_details/imei", true));
+                       messageToSend++;
+                       countToCheck++;
+                       Thread.sleep(pauseMS);
+
+                       queue.log(createEvent("john", "select",
+                                       "xademo/customer_details/imei", true));
+                       messageToSend++;
+                       queue.log(createEvent("john", "select",
+                                       "xademo/customer_details/imei", true));
+                       messageToSend++;
+                       countToCheck++;
+                       queue.log(createEvent("jane", "select",
+                                       "xademo/customer_details/imei", true));
+                       messageToSend++;
+                       countToCheck++;
+                       Thread.sleep(pauseMS);
+
+                       queue.log(createEvent("john", "select",
+                                       "xademo/customer_details/imei", true));
+                       messageToSend++;
+                       countToCheck++;
+                       queue.log(createEvent("john", "select",
+                                       "xademo/customer_details/imei", false));
+                       messageToSend++;
+                       countToCheck++;
+                       queue.log(createEvent("jane", "select",
+                                       "xademo/customer_details/imei", true));
+                       messageToSend++;
+                       countToCheck++;
+                       Thread.sleep(pauseMS);
+
+               } catch (InterruptedException e1) {
+                       logger.error("Sleep interupted", e1);
+               }
+               // Let's wait for second
+               try {
+                       Thread.sleep(1000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+
+               queue.waitToComplete();
+               queue.stop();
+               queue.waitToComplete();
+               // Let's wait for second
+               try {
+                       Thread.sleep(1000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+               assertEquals(messageToSend, testConsumer.getSumTotal());
+               assertEquals(countToCheck, testConsumer.getCountTotal());
+               assertNull("Event not in sequnce", testConsumer.isInSequence());
+       }
+
+       @Test
+       public void testAuditSummaryByInfra() {
+               logger.debug("testAuditSummaryByInfra()...");
+
+               Properties props = new Properties();
+               // Destination
+               String propPrefix = AuditProviderFactory.AUDIT_DEST_BASE + 
".test";
+               props.put(propPrefix, "enable");
+               props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + 
"summary" + "."
+                               + "enabled", "true");
+               props.put(propPrefix + "." + BaseAuditProvider.PROP_NAME, 
"test");
+               props.put(propPrefix + "." + BaseAuditProvider.PROP_QUEUE, 
"none");
+
+               props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "."
+                               + AuditSummaryQueue.PROP_SUMMARY_INTERVAL, "" + 
300);
+               props.put(propPrefix + "." + BaseAuditProvider.PROP_CLASS_NAME,
+                               TestConsumer.class.getName());
+
+               AuditProviderFactory factory = 
AuditProviderFactory.getInstance();
+               factory.init(props, "test");
+               BaseAuditProvider queue = (BaseAuditProvider) 
factory.getProvider();
+               BaseAuditProvider consumer = (BaseAuditProvider) 
queue.getConsumer();
+               while (consumer.getConsumer() != null) {
+                       consumer = (BaseAuditProvider) consumer.getConsumer();
+               }
+               assertTrue("Consumer should be TestConsumer. class="
+                               + consumer.getClass().getName(),
+                               consumer instanceof TestConsumer);
+               TestConsumer testConsumer = (TestConsumer) consumer;
+               commonTestSummary(testConsumer, queue);
+       }
+
+       @Test
+       public void testMultipleQueue() {
+               logger.debug("testAuditAsyncQueue()...");
+               int destCount = 3;
+               TestConsumer[] testConsumer = new TestConsumer[destCount];
+
+               MultiDestAuditProvider multiQueue = new 
MultiDestAuditProvider();
+               for (int i = 0; i < destCount; i++) {
+                       testConsumer[i] = new TestConsumer();
+                       multiQueue.addAuditProvider(testConsumer[i]);
+               }
+
+               AuditAsyncQueue queue = new AuditAsyncQueue(multiQueue);
+               Properties props = new Properties();
+               queue.init(props);
+               queue.start();
+
+               int messageToSend = 10;
+               for (int i = 0; i < messageToSend; i++) {
+                       queue.log(createEvent());
+               }
+               queue.stop();
+               queue.waitToComplete();
+               // Let's wait for second
+               try {
+                       Thread.sleep(1000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+               for (int i = 0; i < destCount; i++) {
+                       assertEquals("consumer" + i, messageToSend,
+                                       testConsumer[i].getCountTotal());
+                       assertEquals("consumer" + i, messageToSend,
+                                       testConsumer[i].getSumTotal());
+
+               }
+       }
+
+       @Test
+       public void testAuditBatchQueueBySize() {
+               logger.debug("testAuditBatchQueue()...");
+               int messageToSend = 10;
+
+               String basePropName = "testAuditBatchQueueBySize_"
+                               + MiscUtil.generateUniqueId();
+               int batchSize = messageToSend / 3;
+               int expectedBatchSize = batchSize
+                               + (batchSize * 3 < messageToSend ? 1 : 0);
+               int queueSize = messageToSend * 2;
+               int intervalMS = messageToSend * 100; // Deliberately big 
interval
+               Properties props = new Properties();
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
+                               + batchSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
+                               + queueSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
+                               "" + intervalMS);
+
+               TestConsumer testConsumer = new TestConsumer();
+               AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
+               queue.init(props, basePropName);
+               queue.start();
+
+               for (int i = 0; i < messageToSend; i++) {
+                       queue.log(createEvent());
+
+               }
+               // Let's wait for second
+               try {
+                       Thread.sleep(2000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+
+               queue.waitToComplete();
+               queue.stop();
+               queue.waitToComplete();
+
+               assertEquals("Total count", messageToSend, 
testConsumer.getCountTotal());
+               assertEquals("Total sum", messageToSend, 
testConsumer.getSumTotal());
+               assertEquals("Total batch", expectedBatchSize,
+                               testConsumer.getBatchCount());
+               assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+       }
+
+       @Test
+       public void testAuditBatchQueueByTime() {
+               logger.debug("testAuditBatchQueue()...");
+
+               int messageToSend = 10;
+
+               String basePropName = "testAuditBatchQueueByTime_"
+                               + MiscUtil.generateUniqueId();
+               int batchSize = messageToSend * 2; // Deliberately big size
+               int queueSize = messageToSend * 2;
+               int intervalMS = (1000 / messageToSend) * 3; // e.g (1000/10 * 
3) = 300
+                                                                               
                                // ms
+               int pauseMS = 1000 / messageToSend + 3; // e.g. 1000/10 + 3 = 
103ms
+               int expectedBatchSize = (messageToSend * pauseMS) / intervalMS 
+ 1;
+
+               Properties props = new Properties();
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
+                               + batchSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
+                               + queueSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
+                               "" + intervalMS);
+
+               TestConsumer testConsumer = new TestConsumer();
+               AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
+               queue.init(props, basePropName);
+               queue.start();
+
+               for (int i = 0; i < messageToSend; i++) {
+                       queue.log(createEvent());
+                       try {
+                               Thread.sleep(pauseMS);
+                       } catch (InterruptedException e) {
+                               // ignore
+                       }
+               }
+               // Let's wait for second
+               try {
+                       Thread.sleep(2000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+               queue.waitToComplete();
+               queue.stop();
+               queue.waitToComplete();
+
+               assertEquals("Total count", messageToSend, 
testConsumer.getCountTotal());
+               assertEquals("Total sum", messageToSend, 
testConsumer.getSumTotal());
+               assertEquals("Total batch", expectedBatchSize,
+                               testConsumer.getBatchCount());
+               assertNull("Event not in sequnce", testConsumer.isInSequence());
+       }
+
+       @Test
+       public void testAuditBatchQueueDestDown() {
+               logger.debug("testAuditBatchQueueDestDown()...");
+               int messageToSend = 10;
+
+               String basePropName = "testAuditBatchQueueDestDown_"
+                               + MiscUtil.generateUniqueId();
+               int batchSize = messageToSend / 3;
+               int queueSize = messageToSend * 2;
+               int intervalMS = Integer.MAX_VALUE; // Deliberately big interval
+               Properties props = new Properties();
+               props.put(basePropName + "." + BaseAuditProvider.PROP_NAME,
+                               "testAuditBatchQueueDestDown");
+
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
+                               + batchSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
+                               + queueSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
+                               "" + intervalMS);
+
+               // Enable File Spooling
+               props.put(basePropName + "." + "filespool.enable", "" + true);
+               props.put(basePropName + "." + "filespool.dir", "target");
+
+               TestConsumer testConsumer = new TestConsumer();
+               testConsumer.isDown = true;
+
+               AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
+               queue.init(props, basePropName);
+               queue.start();
+
+               for (int i = 0; i < messageToSend; i++) {
+                       queue.log(createEvent());
+
+               }
+               // Let's wait for second
+               try {
+                       Thread.sleep(2000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+
+               queue.waitToComplete(5000);
+               queue.stop();
+               queue.waitToComplete();
+
+               assertEquals("Total count", 0, testConsumer.getCountTotal());
+               assertEquals("Total sum", 0, testConsumer.getSumTotal());
+               assertEquals("Total batch", 0, testConsumer.getBatchCount());
+               assertNull("Event not in sequnce", testConsumer.isInSequence());
+       }
+
+       @Test
+       public void testAuditBatchQueueDestDownFlipFlop() {
+               logger.debug("testAuditBatchQueueDestDownFlipFlop()...");
+               int messageToSend = 10;
+
+               String basePropName = "testAuditBatchQueueDestDownFlipFlop_"
+                               + MiscUtil.generateUniqueId();
+               int batchSize = messageToSend / 3;
+               int queueSize = messageToSend * 2;
+               int intervalMS = 3000; // Deliberately big interval
+               Properties props = new Properties();
+               props.put(
+                               basePropName + "." + 
BaseAuditProvider.PROP_NAME,
+                               "testAuditBatchQueueDestDownFlipFlop_"
+                                               + MiscUtil.generateUniqueId());
+
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
+                               + batchSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
+                               + queueSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
+                               "" + intervalMS);
+
+               // Enable File Spooling
+               int destRetryMS = 10;
+               props.put(
+                               basePropName + "." + 
BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+                               "" + true);
+               props.put(
+                               basePropName + "." + 
AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
+                               "target");
+               props.put(basePropName + "."
+                               + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, 
""
+                               + destRetryMS);
+
+               TestConsumer testConsumer = new TestConsumer();
+               testConsumer.isDown = false;
+
+               AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
+               queue.init(props, basePropName);
+               queue.start();
+
+               try {
+                       queue.log(createEvent());
+                       queue.log(createEvent());
+                       queue.log(createEvent());
+                       Thread.sleep(1000);
+                       testConsumer.isDown = true;
+                       Thread.sleep(1000);
+                       queue.log(createEvent());
+                       queue.log(createEvent());
+                       queue.log(createEvent());
+                       Thread.sleep(1000);
+                       testConsumer.isDown = false;
+                       Thread.sleep(1000);
+                       queue.log(createEvent());
+                       queue.log(createEvent());
+                       queue.log(createEvent());
+                       Thread.sleep(1000);
+                       testConsumer.isDown = true;
+                       Thread.sleep(1000);
+                       queue.log(createEvent());
+                       Thread.sleep(1000);
+                       testConsumer.isDown = false;
+                       Thread.sleep(1000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+               // Let's wait for second
+               try {
+                       Thread.sleep(2000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+
+               queue.waitToComplete(5000);
+               queue.stop();
+               queue.waitToComplete();
+
+               assertEquals("Total count", messageToSend, 
testConsumer.getCountTotal());
+               assertEquals("Total sum", messageToSend, 
testConsumer.getSumTotal());
+               assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+       }
+
+       /**
+        * See if we recover after restart
+        */
+       @Test
+       public void testAuditBatchQueueDestDownRestart() {
+               logger.debug("testAuditBatchQueueDestDownRestart()...");
+               int messageToSend = 10;
+
+               String basePropName = "testAuditBatchQueueDestDownRestart_"
+                               + MiscUtil.generateUniqueId();
+               int batchSize = messageToSend / 3;
+               int queueSize = messageToSend * 2;
+               int intervalMS = 3000; // Deliberately big interval
+               int maxArchivedFiles = 1;
+               Properties props = new Properties();
+               props.put(
+                               basePropName + "." + 
BaseAuditProvider.PROP_NAME,
+                               "testAuditBatchQueueDestDownRestart_"
+                                               + MiscUtil.generateUniqueId());
+
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
+                               + batchSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
+                               + queueSize);
+               props.put(basePropName + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
+                               "" + intervalMS);
+
+               // Enable File Spooling
+               int destRetryMS = 10;
+               props.put(
+                               basePropName + "." + 
BaseAuditProvider.PROP_FILE_SPOOL_ENABLE,
+                               "" + true);
+               props.put(
+                               basePropName + "." + 
AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR,
+                               "target");
+               props.put(basePropName + "."
+                               + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, 
""
+                               + destRetryMS);
+               props.put(basePropName + "."
+                               + 
AuditFileSpool.PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, ""
+                               + maxArchivedFiles);
+
+               TestConsumer testConsumer = new TestConsumer();
+               testConsumer.isDown = true;
+
+               AuditBatchQueue queue = new AuditBatchQueue(testConsumer);
+               queue.init(props, basePropName);
+               queue.start();
+
+               for (int i = 0; i < messageToSend; i++) {
+                       queue.log(createEvent());
+
+               }
+               // Let's wait for second or two
+               try {
+                       Thread.sleep(2000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+
+               queue.waitToComplete(5000);
+               queue.stop();
+               queue.waitToComplete();
+
+               testConsumer.isDown = true;
+
+               // Let's wait for second or two
+               try {
+                       Thread.sleep(5000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+
+               // Let's now recreate the objects
+               testConsumer = new TestConsumer();
+
+               queue = new AuditBatchQueue(testConsumer);
+               queue.init(props, basePropName);
+               queue.start();
+
+               // Let's wait for second
+               try {
+                       Thread.sleep(2000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+
+               queue.waitToComplete(5000);
+               queue.stop();
+               queue.waitToComplete();
+
+               assertEquals("Total count", messageToSend, 
testConsumer.getCountTotal());
+               assertEquals("Total sum", messageToSend, 
testConsumer.getSumTotal());
+               assertNull("Event not in sequnce", testConsumer.isInSequence());
+
+       }
+
+       @Test
+       public void testFileDestination() {
+               logger.debug("testFileDestination()...");
+
+               int messageToSend = 10;
+               int batchSize = messageToSend / 3;
+               int queueSize = messageToSend * 2;
+               int intervalMS = 500; // Should be less than final sleep time
+
+               String logFolderName = "target/testFileDestination";
+               File logFolder = new File(logFolderName);
+               String logFileName = "test_ranger_audit.log";
+               File logFile = new File(logFolder, logFileName);
+
+               Properties props = new Properties();
+               // Destination
+               String filePropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + 
".file";
+               props.put(filePropPrefix, "enable");
+               props.put(filePropPrefix + "." + BaseAuditProvider.PROP_NAME, 
"file");
+               props.put(filePropPrefix + "."
+                               + FileAuditDestination.PROP_FILE_LOCAL_DIR, 
logFolderName);
+               props.put(filePropPrefix + "."
+                               + 
FileAuditDestination.PROP_FILE_LOCAL_FILE_NAME_FORMAT,
+                               "%app-type%_ranger_audit.log");
+               props.put(filePropPrefix + "."
+                               + FileAuditDestination.PROP_FILE_FILE_ROLLOVER, 
"" + 10);
+
+               props.put(filePropPrefix + "." + BaseAuditProvider.PROP_QUEUE, 
"batch");
+               String batchPropPrefix = filePropPrefix + "." + "batch";
+
+               props.put(batchPropPrefix + "." + 
BaseAuditProvider.PROP_BATCH_SIZE, ""
+                               + batchSize);
+               props.put(batchPropPrefix + "." + 
BaseAuditProvider.PROP_QUEUE_SIZE, ""
+                               + queueSize);
+               props.put(
+                               batchPropPrefix + "." + 
BaseAuditProvider.PROP_BATCH_INTERVAL,
+                               "" + intervalMS);
+
+               // Enable File Spooling
+               int destRetryMS = 10;
+               props.put(batchPropPrefix + "."
+                               + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, "" 
+ true);
+               props.put(batchPropPrefix + "."
+                               + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, 
"target");
+               props.put(batchPropPrefix + "."
+                               + AuditFileSpool.PROP_FILE_SPOOL_DEST_RETRY_MS, 
""
+                               + destRetryMS);
+
+               AuditProviderFactory factory = 
AuditProviderFactory.getInstance();
+               factory.init(props, "test");
+
+               // FileAuditDestination fileDest = new FileAuditDestination();
+               // fileDest.init(props, filePropPrefix);
+               //
+               // AuditBatchQueue queue = new AuditBatchQueue(fileDest);
+               // queue.init(props, batchPropPrefix);
+               // queue.start();
+
+               AuditProvider queue = factory.getProvider();
+
+               for (int i = 0; i < messageToSend; i++) {
+                       queue.log(createEvent());
+               }
+               // Let's wait for second
+               try {
+                       Thread.sleep(1000);
+               } catch (InterruptedException e) {
+                       // ignore
+               }
+
+               queue.waitToComplete();
+               queue.stop();
+               queue.waitToComplete();
+
+               assertTrue("File created", logFile.exists());
+               try {
+                       List<AuthzAuditEvent> eventList = new 
ArrayList<AuthzAuditEvent>();
+                       int totalSum = 0;
+                       BufferedReader br = new BufferedReader(new 
FileReader(logFile));
+                       String line;
+                       int lastSeq = -1;
+                       boolean outOfSeq = false;
+                       while ((line = br.readLine()) != null) {
+                               AuthzAuditEvent event = MiscUtil.fromJson(line,
+                                               AuthzAuditEvent.class);
+                               eventList.add(event);
+                               totalSum += event.getEventCount();
+                               if (event.getSeqNum() <= lastSeq) {
+                                       outOfSeq = true;
+                               }
+                       }
+                       br.close();
+                       assertEquals("Total count", messageToSend, 
eventList.size());
+                       assertEquals("Total sum", messageToSend, totalSum);
+                       assertFalse("Event not in sequnce", outOfSeq);
+
+               } catch (Throwable e) {
+                       logger.error("Error opening file for reading.", e);
+                       assertTrue("Error reading file. fileName=" + logFile + 
", error="
+                                       + e.toString(), true);
+               }
+
+       }
+
+       private AuthzAuditEvent createEvent() {
+               AuthzAuditEvent event = new AuthzAuditEvent();
+               event.setSeqNum(++seqNum);
+               return event;
+       }
+
+       private AuthzAuditEvent createEvent(String user, String accessType,
+                       String resource, boolean isAllowed) {
+               AuthzAuditEvent event = new AuthzAuditEvent();
+               event.setUser(user);
+               event.setAccessType(accessType);
+               event.setResourcePath(resource);
+               event.setAccessResult(isAllowed ? (short) 1 : (short) 0);
+
+               event.setSeqNum(++seqNum);
+               return event;
+       }
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
----------------------------------------------------------------------
diff --git 
a/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java 
b/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
new file mode 100644
index 0000000..d4d50f0
--- /dev/null
+++ b/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.AuditDestination;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.model.AuthzAuditEvent;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class TestConsumer extends AuditDestination {
+       private static final Log logger = LogFactory.getLog(TestConsumer.class);
+
+       int countTotal = 0;
+       int sumTotal = 0;
+       int batchCount = 0;
+       String providerName = getClass().getName();
+       boolean isDown = false;
+       int batchSize = 3;
+
+       List<AuthzAuditEvent> eventList = new ArrayList<AuthzAuditEvent>();
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger
+        * .audit.model.AuditEventBase)
+        */
+       @Override
+       public boolean log(AuditEventBase event) {
+               if (isDown) {
+                       return false;
+               }
+               countTotal++;
+               if (event instanceof AuthzAuditEvent) {
+                       AuthzAuditEvent azEvent = (AuthzAuditEvent) event;
+                       sumTotal += azEvent.getEventCount();
+                       logger.info("EVENT:" + event);
+                       eventList.add(azEvent);
+               }
+               return true;
+       }
+
+       @Override
+       public boolean log(Collection<AuditEventBase> events) {
+               if (isDown) {
+                       return false;
+               }
+               batchCount++;
+               for (AuditEventBase event : events) {
+                       log(event);
+               }
+               return true;
+       }
+
+       @Override
+       public boolean logJSON(String jsonStr) {
+               if (isDown) {
+                       return false;
+               }
+               countTotal++;
+               AuthzAuditEvent event = MiscUtil.fromJson(jsonStr,
+                               AuthzAuditEvent.class);
+               sumTotal += event.getEventCount();
+               logger.info("JSON:" + jsonStr);
+               eventList.add(event);
+               return true;
+       }
+
+       @Override
+       public boolean logJSON(Collection<String> events) {
+               if (isDown) {
+                       return false;
+               }
+               batchCount++;
+               for (String event : events) {
+                       logJSON(event);
+               }
+               return true;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties
+        * )
+        */
+       @Override
+       public void init(Properties prop) {
+               // Nothing to do here
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#start()
+        */
+       @Override
+       public void start() {
+               // Nothing to do here
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#stop()
+        */
+       @Override
+       public void stop() {
+               // Nothing to do here
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete()
+        */
+       @Override
+       public void waitToComplete() {
+       }
+
+       @Override
+       public int getMaxBatchSize() {
+               return batchSize;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending()
+        */
+       @Override
+       public boolean isFlushPending() {
+               return false;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime()
+        */
+       @Override
+       public long getLastFlushTime() {
+               return 0;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#flush()
+        */
+       @Override
+       public void flush() {
+               // Nothing to do here
+       }
+
+       public int getCountTotal() {
+               return countTotal;
+       }
+
+       public int getSumTotal() {
+               return sumTotal;
+       }
+
+       public int getBatchCount() {
+               return batchCount;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties
+        * , java.lang.String)
+        */
+       @Override
+       public void init(Properties prop, String basePropertyName) {
+
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long)
+        */
+       @Override
+       public void waitToComplete(long timeout) {
+
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#getName()
+        */
+       @Override
+       public String getName() {
+               return providerName;
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see org.apache.ranger.audit.provider.AuditProvider#isDrain()
+        */
+       @Override
+       public boolean isDrain() {
+               return false;
+       }
+
+       // Local methods
+       public AuthzAuditEvent isInSequence() {
+               int lastSeq = -1;
+               for (AuthzAuditEvent event : eventList) {
+                       if (event.getSeqNum() <= lastSeq) {
+                               return event;
+                       }
+               }
+               return null;
+       }
+}

Reply via email to