Repository: incubator-ranger Updated Branches: refs/heads/stack 3215fc190 -> 3565427b2
RANGER-243: AsyncAuditProvider thread should exit without delay on shutdown Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/7690db82 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/7690db82 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/7690db82 Branch: refs/heads/stack Commit: 7690db8292a8bb42f10fa2a8db4326f759a781a6 Parents: dc0b2c8 Author: Madhan Neethiraj <[email protected]> Authored: Tue Feb 10 22:03:47 2015 -0800 Committer: Madhan Neethiraj <[email protected]> Committed: Tue Feb 10 22:03:47 2015 -0800 ---------------------------------------------------------------------- .../audit/provider/AsyncAuditProvider.java | 44 +++++++++----------- 1 file changed, 20 insertions(+), 24 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7690db82/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java index 2f90642..5da5064 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java @@ -38,7 +38,6 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements private BlockingQueue<AuditEventBase> mQueue = null; private Thread mThread = null; - private boolean mStopThread = false; private String mName = null; private int mMaxQueueSize = 10 * 1024; private int mMaxFlushInterval = 5000; // 5 seconds @@ -109,13 +108,12 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements @Override public void stop() { - mStopThread = true; + mThread.interrupt(); try { mThread.join(); - } catch (InterruptedException e) { - // TODO Auto-generated catch block - e.printStackTrace(); + } catch (InterruptedException excp) { + LOG.error("AsyncAuditProvider.stop(): failed while waiting for thread to exit", excp); } super.stop(); @@ -132,7 +130,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements public void run() { LOG.info("==> AsyncAuditProvider.run()"); - while (!mStopThread) { + while (true) { AuditEventBase event = null; try { event = dequeueEvent(); @@ -142,6 +140,8 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements } else { flush(); } + } catch (InterruptedException excp) { + break; } catch (Exception excp) { logFailedEvent(event, excp); } @@ -167,31 +167,27 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements } } - private AuditEventBase dequeueEvent() { + private AuditEventBase dequeueEvent() throws InterruptedException { AuditEventBase ret = mQueue.poll(); - try { - while(ret == null && !mStopThread) { - logSummaryIfRequired(); + while(ret == null) { + logSummaryIfRequired(); - if (mMaxFlushInterval > 0 && isFlushPending()) { - long timeTillNextFlush = getTimeTillNextFlush(); + if (mMaxFlushInterval > 0 && isFlushPending()) { + long timeTillNextFlush = getTimeTillNextFlush(); - if (timeTillNextFlush <= 0) { - break; // force flush - } + if (timeTillNextFlush <= 0) { + break; // force flush + } - ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS); - } else { - // Let's wake up for summary logging - long waitTime = intervalLogDurationMS - (System.currentTimeMillis() - lastIntervalLogTime); - waitTime = waitTime <= 0 ? intervalLogDurationMS : waitTime; + ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS); + } else { + // Let's wake up for summary logging + long waitTime = intervalLogDurationMS - (System.currentTimeMillis() - lastIntervalLogTime); + waitTime = waitTime <= 0 ? intervalLogDurationMS : waitTime; - ret = mQueue.poll(waitTime, TimeUnit.MILLISECONDS); - } + ret = mQueue.poll(waitTime, TimeUnit.MILLISECONDS); } - } catch(InterruptedException excp) { - LOG.error("AsyncAuditProvider.dequeueEvent()", excp); } if(ret != null) {
