http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java index e102d8b..3e1940b 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java @@ -25,26 +25,25 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; import java.util.Properties; -import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.model.AuditEventBase; -import org.apache.ranger.audit.provider.AuditProvider; -import org.apache.ranger.audit.provider.BaseAuditProvider; +import org.apache.ranger.audit.provider.AuditHandler; import org.apache.ranger.audit.provider.MiscUtil; /** * This is a non-blocking queue with no limit on capacity. */ -public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { +public class AuditSummaryQueue extends AuditQueue implements Runnable { private static final Log logger = LogFactory .getLog(AuditSummaryQueue.class); public static final String PROP_SUMMARY_INTERVAL = "summary.interval.ms"; - LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>(); + LinkedBlockingQueue<AuditEventBase> queue = new LinkedBlockingQueue<AuditEventBase>(); Thread consumerThread = null; static int threadCount = 0; @@ -52,15 +51,11 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { private static final int MAX_DRAIN = 100000; - private int maxSummaryInterval = 5000; + private int maxSummaryIntervalMs = 5000; HashMap<String, AuditSummary> summaryMap = new HashMap<String, AuditSummary>(); - public AuditSummaryQueue() { - setName(DEFAULT_NAME); - } - - public AuditSummaryQueue(AuditProvider consumer) { + public AuditSummaryQueue(AuditHandler consumer) { super(consumer); setName(DEFAULT_NAME); } @@ -68,9 +63,9 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { @Override public void init(Properties props, String propPrefix) { super.init(props, propPrefix); - maxSummaryInterval = MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_SUMMARY_INTERVAL, maxSummaryInterval); - logger.info("maxSummaryInterval=" + maxSummaryInterval + ", name=" + maxSummaryIntervalMs = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_SUMMARY_INTERVAL, maxSummaryIntervalMs); + logger.info("maxSummaryInterval=" + maxSummaryIntervalMs + ", name=" + getName()); } @@ -88,7 +83,6 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { return false; } queue.add(event); - addLifeTimeInLogCount(1); return true; } @@ -133,23 +127,10 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { if (consumerThread != null) { consumerThread.interrupt(); } - consumerThread = null; } catch (Throwable t) { // ignore any exception } - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() - */ - @Override - public boolean isFlushPending() { - if (queue.isEmpty()) { - return consumer.isFlushPending(); - } - return true; + consumerThread = null; } /* @@ -164,7 +145,7 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { while (true) { // Time to next dispatch long nextDispatchDuration = lastDispatchTime - - System.currentTimeMillis() + maxSummaryInterval; + - System.currentTimeMillis() + maxSummaryIntervalMs; Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>(); @@ -184,7 +165,7 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { } else { // poll returned due to timeout, so reseting clock nextDispatchDuration = lastDispatchTime - - System.currentTimeMillis() + maxSummaryInterval; + - System.currentTimeMillis() + maxSummaryIntervalMs; lastDispatchTime = System.currentTimeMillis(); } } catch (InterruptedException e) { @@ -213,6 +194,9 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { } if (isDrain() || nextDispatchDuration <= 0) { + // Reset time just before sending the logs + lastDispatchTime = System.currentTimeMillis(); + for (Map.Entry<String, AuditSummary> entry : summaryMap .entrySet()) { AuditSummary auditSummary = entry.getValue(); @@ -221,9 +205,6 @@ public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { - auditSummary.startTime.getTime(); timeDiff = timeDiff > 0 ? timeDiff : 1; auditSummary.event.setEventDurationMS(timeDiff); - - // Reset time just before sending the logs - lastDispatchTime = System.currentTimeMillis(); boolean ret = consumer.log(auditSummary.event); if (!ret) { // We need to drop this event
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java index c2dc955..87c6a8f 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java @@ -23,7 +23,7 @@ import org.apache.log4j.xml.DOMConfigurator; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; import org.apache.ranger.audit.model.EnumRepositoryType; -import org.apache.ranger.audit.provider.AuditProvider; +import org.apache.ranger.audit.provider.AuditHandler; import org.apache.ranger.audit.provider.AuditProviderFactory; import org.apache.commons.logging.LogFactory; @@ -74,7 +74,7 @@ public class TestEvents { AuditProviderFactory.getInstance().init(auditProperties, "hdfs"); - AuditProvider provider = AuditProviderFactory.getAuditProvider(); + AuditHandler provider = AuditProviderFactory.getAuditProvider(); LOG.info("provider=" + provider.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java ---------------------------------------------------------------------- diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java index 45477e2..021c49a 100644 --- a/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java +++ b/security-admin/src/test/java/org/apache/ranger/audit/TestAuditQueue.java @@ -32,14 +32,15 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.destination.FileAuditDestination; import org.apache.ranger.audit.model.AuthzAuditEvent; -import org.apache.ranger.audit.provider.AuditProvider; +import org.apache.ranger.audit.provider.AuditHandler; import org.apache.ranger.audit.provider.AuditProviderFactory; -import org.apache.ranger.audit.provider.BaseAuditProvider; +import org.apache.ranger.audit.provider.BaseAuditHandler; import org.apache.ranger.audit.provider.MiscUtil; import org.apache.ranger.audit.provider.MultiDestAuditProvider; import org.apache.ranger.audit.queue.AuditAsyncQueue; import org.apache.ranger.audit.queue.AuditBatchQueue; import org.apache.ranger.audit.queue.AuditFileSpool; +import org.apache.ranger.audit.queue.AuditQueue; import org.apache.ranger.audit.queue.AuditSummaryQueue; import org.junit.AfterClass; import org.junit.BeforeClass; @@ -93,9 +94,9 @@ public class TestAuditQueue { AuditSummaryQueue queue = new AuditSummaryQueue(testConsumer); Properties props = new Properties(); - props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + AuditSummaryQueue.PROP_SUMMARY_INTERVAL, "" + 300); - queue.init(props, BaseAuditProvider.PROP_DEFAULT_PREFIX); + queue.init(props, BaseAuditHandler.PROP_DEFAULT_PREFIX); queue.start(); @@ -103,7 +104,7 @@ public class TestAuditQueue { } private void commonTestSummary(TestConsumer testConsumer, - BaseAuditProvider queue) { + BaseAuditHandler queue) { int messageToSend = 0; int pauseMS = 330; @@ -171,7 +172,6 @@ public class TestAuditQueue { } assertEquals(messageToSend, testConsumer.getSumTotal()); assertEquals(countToCheck, testConsumer.getCountTotal()); - assertNull("Event not in sequnce", testConsumer.isInSequence()); } @Test @@ -182,22 +182,23 @@ public class TestAuditQueue { // Destination String propPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".test"; props.put(propPrefix, "enable"); - props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "summary" + "." + props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + "summary" + "." + "enabled", "true"); - props.put(propPrefix + "." + BaseAuditProvider.PROP_NAME, "test"); - props.put(propPrefix + "." + BaseAuditProvider.PROP_QUEUE, "none"); + props.put(propPrefix + "." + BaseAuditHandler.PROP_NAME, "test"); + props.put(propPrefix + "." + AuditQueue.PROP_QUEUE, "none"); - props.put(BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + props.put(BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + AuditSummaryQueue.PROP_SUMMARY_INTERVAL, "" + 300); - props.put(propPrefix + "." + BaseAuditProvider.PROP_CLASS_NAME, + props.put(propPrefix + "." + BaseAuditHandler.PROP_CLASS_NAME, TestConsumer.class.getName()); AuditProviderFactory factory = AuditProviderFactory.getInstance(); factory.init(props, "test"); - BaseAuditProvider queue = (BaseAuditProvider) factory.getProvider(); - BaseAuditProvider consumer = (BaseAuditProvider) queue.getConsumer(); - while (consumer.getConsumer() != null) { - consumer = (BaseAuditProvider) consumer.getConsumer(); + AuditQueue queue = (AuditQueue) factory.getProvider(); + BaseAuditHandler consumer = (BaseAuditHandler) queue.getConsumer(); + while (consumer != null && consumer instanceof AuditQueue) { + AuditQueue cQueue = (AuditQueue) consumer; + consumer = (BaseAuditHandler) cQueue.getConsumer(); } assertTrue("Consumer should be TestConsumer. class=" + consumer.getClass().getName(), @@ -257,12 +258,12 @@ public class TestAuditQueue { int queueSize = messageToSend * 2; int intervalMS = messageToSend * 100; // Deliberately big interval Properties props = new Properties(); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, "" + batchSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, "" + queueSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL, - "" + intervalMS); + props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, "" + + intervalMS); TestConsumer testConsumer = new TestConsumer(); AuditBatchQueue queue = new AuditBatchQueue(testConsumer); @@ -308,12 +309,12 @@ public class TestAuditQueue { int expectedBatchSize = (messageToSend * pauseMS) / intervalMS + 1; Properties props = new Properties(); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, "" + batchSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, "" + queueSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL, - "" + intervalMS); + props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, "" + + intervalMS); TestConsumer testConsumer = new TestConsumer(); AuditBatchQueue queue = new AuditBatchQueue(testConsumer); @@ -356,15 +357,15 @@ public class TestAuditQueue { int queueSize = messageToSend * 2; int intervalMS = Integer.MAX_VALUE; // Deliberately big interval Properties props = new Properties(); - props.put(basePropName + "." + BaseAuditProvider.PROP_NAME, + props.put(basePropName + "." + BaseAuditHandler.PROP_NAME, "testAuditBatchQueueDestDown"); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, "" + batchSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, "" + queueSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL, - "" + intervalMS); + props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, "" + + intervalMS); // Enable File Spooling props.put(basePropName + "." + "filespool.enable", "" + true); @@ -410,21 +411,20 @@ public class TestAuditQueue { int intervalMS = 3000; // Deliberately big interval Properties props = new Properties(); props.put( - basePropName + "." + BaseAuditProvider.PROP_NAME, + basePropName + "." + BaseAuditHandler.PROP_NAME, "testAuditBatchQueueDestDownFlipFlop_" + MiscUtil.generateUniqueId()); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, "" + batchSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, "" + queueSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL, - "" + intervalMS); + props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, "" + + intervalMS); // Enable File Spooling int destRetryMS = 10; - props.put( - basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, + props.put(basePropName + "." + AuditQueue.PROP_FILE_SPOOL_ENABLE, "" + true); props.put( basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, @@ -499,21 +499,20 @@ public class TestAuditQueue { int maxArchivedFiles = 1; Properties props = new Properties(); props.put( - basePropName + "." + BaseAuditProvider.PROP_NAME, + basePropName + "." + BaseAuditHandler.PROP_NAME, "testAuditBatchQueueDestDownRestart_" + MiscUtil.generateUniqueId()); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_BATCH_SIZE, "" + batchSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_QUEUE_SIZE, "" + props.put(basePropName + "." + AuditQueue.PROP_QUEUE_SIZE, "" + queueSize); - props.put(basePropName + "." + BaseAuditProvider.PROP_BATCH_INTERVAL, - "" + intervalMS); + props.put(basePropName + "." + AuditQueue.PROP_BATCH_INTERVAL, "" + + intervalMS); // Enable File Spooling int destRetryMS = 10; - props.put( - basePropName + "." + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, + props.put(basePropName + "." + AuditQueue.PROP_FILE_SPOOL_ENABLE, "" + true); props.put( basePropName + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, @@ -598,7 +597,7 @@ public class TestAuditQueue { // Destination String filePropPrefix = AuditProviderFactory.AUDIT_DEST_BASE + ".file"; props.put(filePropPrefix, "enable"); - props.put(filePropPrefix + "." + BaseAuditProvider.PROP_NAME, "file"); + props.put(filePropPrefix + "." + AuditQueue.PROP_NAME, "file"); props.put(filePropPrefix + "." + FileAuditDestination.PROP_FILE_LOCAL_DIR, logFolderName); props.put(filePropPrefix + "." @@ -607,21 +606,20 @@ public class TestAuditQueue { props.put(filePropPrefix + "." + FileAuditDestination.PROP_FILE_FILE_ROLLOVER, "" + 10); - props.put(filePropPrefix + "." + BaseAuditProvider.PROP_QUEUE, "batch"); + props.put(filePropPrefix + "." + AuditQueue.PROP_QUEUE, "batch"); String batchPropPrefix = filePropPrefix + "." + "batch"; - props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_SIZE, "" + props.put(batchPropPrefix + "." + AuditQueue.PROP_BATCH_SIZE, "" + batchSize); - props.put(batchPropPrefix + "." + BaseAuditProvider.PROP_QUEUE_SIZE, "" + props.put(batchPropPrefix + "." + AuditQueue.PROP_QUEUE_SIZE, "" + queueSize); - props.put( - batchPropPrefix + "." + BaseAuditProvider.PROP_BATCH_INTERVAL, + props.put(batchPropPrefix + "." + AuditQueue.PROP_BATCH_INTERVAL, "" + intervalMS); // Enable File Spooling int destRetryMS = 10; props.put(batchPropPrefix + "." - + BaseAuditProvider.PROP_FILE_SPOOL_ENABLE, "" + true); + + AuditQueue.PROP_FILE_SPOOL_ENABLE, "" + true); props.put(batchPropPrefix + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR, "target"); props.put(batchPropPrefix + "." @@ -638,7 +636,7 @@ public class TestAuditQueue { // queue.init(props, batchPropPrefix); // queue.start(); - AuditProvider queue = factory.getProvider(); + AuditHandler queue = factory.getProvider(); for (int i = 0; i < messageToSend; i++) { queue.log(createEvent()); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java ---------------------------------------------------------------------- diff --git a/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java b/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java index d4d50f0..136874d 100644 --- a/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java +++ b/security-admin/src/test/java/org/apache/ranger/audit/TestConsumer.java @@ -39,15 +39,13 @@ public class TestConsumer extends AuditDestination { int batchCount = 0; String providerName = getClass().getName(); boolean isDown = false; - int batchSize = 3; List<AuthzAuditEvent> eventList = new ArrayList<AuthzAuditEvent>(); /* * (non-Javadoc) * - * @see - * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger + * @see org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger * .audit.model.AuditEventBase) */ @Override @@ -144,32 +142,6 @@ public class TestConsumer extends AuditDestination { public void waitToComplete() { } - @Override - public int getMaxBatchSize() { - return batchSize; - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() - */ - @Override - public boolean isFlushPending() { - return false; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime() - */ - @Override - public long getLastFlushTime() { - return 0; - } - /* * (non-Javadoc) * @@ -207,8 +179,7 @@ public class TestConsumer extends AuditDestination { /* * (non-Javadoc) * - * @see - * org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long) + * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete(long) */ @Override public void waitToComplete(long timeout) { @@ -225,23 +196,14 @@ public class TestConsumer extends AuditDestination { return providerName; } - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#isDrain() - */ - @Override - public boolean isDrain() { - return false; - } - // Local methods public AuthzAuditEvent isInSequence() { - int lastSeq = -1; + long lastSeq = -1; for (AuthzAuditEvent event : eventList) { if (event.getSeqNum() <= lastSeq) { return event; } + lastSeq = event.getSeqNum(); } return null; }
