Repository: incubator-ranger Updated Branches: refs/heads/master 82960509b -> d174576a4
RANGER-397 - Fix legacy broken AsyncAuditProvider because of new changes Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/de6e8459 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/de6e8459 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/de6e8459 Branch: refs/heads/master Commit: de6e8459c43b9aa00b52ae5dc51b5655fd206be8 Parents: 20baa3f Author: Don Bosco Durai <[email protected]> Authored: Fri Apr 24 11:25:44 2015 -0700 Committer: Don Bosco Durai <[email protected]> Committed: Fri Apr 24 11:25:44 2015 -0700 ---------------------------------------------------------------------- .../audit/provider/AsyncAuditProvider.java | 61 ++++++++++---------- 1 file changed, 32 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/de6e8459/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java index c3a0c78..f469d80 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java @@ -51,6 +51,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements private AtomicLong intervalDropCount = new AtomicLong(0); private long lastIntervalLogTime = System.currentTimeMillis(); private int intervalLogDurationMS = 60000; + private long lastFlushTime = System.currentTimeMillis(); public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval) { LOG.info("AsyncAuditProvider(" + name + "): creating.."); @@ -139,6 +140,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements if (event != null) { super.log(event); } else { + lastFlushTime = System.currentTimeMillis(); flush(); } } catch (InterruptedException excp) { @@ -149,6 +151,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements } try { + lastFlushTime = System.currentTimeMillis(); flush(); } catch (Exception excp) { LOG.error("AsyncAuditProvider.run()", excp); @@ -174,21 +177,21 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements while(ret == null) { logSummaryIfRequired(); -// if (mMaxFlushInterval > 0 && isFlushPending()) { -// long timeTillNextFlush = getTimeTillNextFlush(); -// -// if (timeTillNextFlush <= 0) { -// break; // force flush -// } -// -// ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS); -// } else { + if (mMaxFlushInterval > 0 ) { + long timeTillNextFlush = getTimeTillNextFlush(); + + if (timeTillNextFlush <= 0) { + break; // force flush + } + + ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS); + } else { // Let's wake up for summary logging long waitTime = intervalLogDurationMS - (System.currentTimeMillis() - lastIntervalLogTime); waitTime = waitTime <= 0 ? intervalLogDurationMS : waitTime; ret = mQueue.poll(waitTime, TimeUnit.MILLISECONDS); -// } + } } if(ret != null) { @@ -246,23 +249,23 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements LOG.debug("<== AsyncAuditProvider.waitToComplete()"); } -// private long getTimeTillNextFlush() { -// long timeTillNextFlush = mMaxFlushInterval; -// -// if (mMaxFlushInterval > 0) { -// long lastFlushTime = getLastFlushTime(); -// -// if (lastFlushTime != 0) { -// long timeSinceLastFlush = System.currentTimeMillis() -// - lastFlushTime; -// -// if (timeSinceLastFlush >= mMaxFlushInterval) -// timeTillNextFlush = 0; -// else -// timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush; -// } -// } -// -// return timeTillNextFlush; -// } + private long getTimeTillNextFlush() { + long timeTillNextFlush = mMaxFlushInterval; + + if (mMaxFlushInterval > 0) { + + if (lastFlushTime != 0) { + long timeSinceLastFlush = System.currentTimeMillis() + - lastFlushTime; + + if (timeSinceLastFlush >= mMaxFlushInterval) { + timeTillNextFlush = 0; + } else { + timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush; + } + } + } + + return timeTillNextFlush; + } }
