Repository: incubator-ranger
Updated Branches:
  refs/heads/stack 3215fc190 -> 3565427b2


RANGER-243: AsyncAuditProvider thread should exit without delay on shutdown


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/7690db82
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/7690db82
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/7690db82

Branch: refs/heads/stack
Commit: 7690db8292a8bb42f10fa2a8db4326f759a781a6
Parents: dc0b2c8
Author: Madhan Neethiraj <[email protected]>
Authored: Tue Feb 10 22:03:47 2015 -0800
Committer: Madhan Neethiraj <[email protected]>
Committed: Tue Feb 10 22:03:47 2015 -0800

----------------------------------------------------------------------
 .../audit/provider/AsyncAuditProvider.java      | 44 +++++++++-----------
 1 file changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7690db82/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index 2f90642..5da5064 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -38,7 +38,6 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
 
        private BlockingQueue<AuditEventBase> mQueue = null;
        private Thread  mThread           = null;
-       private boolean mStopThread       = false;
        private String  mName             = null;
        private int     mMaxQueueSize     = 10 * 1024;
        private int     mMaxFlushInterval = 5000; // 5 seconds
@@ -109,13 +108,12 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
 
        @Override
        public void stop() {
-               mStopThread = true;
+               mThread.interrupt();
 
                try {
                        mThread.join();
-               } catch (InterruptedException e) {
-                       // TODO Auto-generated catch block
-                       e.printStackTrace();
+               } catch (InterruptedException excp) {
+                       LOG.error("AsyncAuditProvider.stop(): failed while 
waiting for thread to exit", excp);
                }
 
                super.stop();
@@ -132,7 +130,7 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
        public void run() {
                LOG.info("==> AsyncAuditProvider.run()");
 
-               while (!mStopThread) {
+               while (true) {
                        AuditEventBase event = null;
                        try {
                                event = dequeueEvent();
@@ -142,6 +140,8 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
                                } else {
                                        flush();
                                }
+                       } catch (InterruptedException excp) {
+                               break;
                        } catch (Exception excp) {
                                logFailedEvent(event, excp);
                        }
@@ -167,31 +167,27 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
                }
        }
 
-       private AuditEventBase dequeueEvent() {
+       private AuditEventBase dequeueEvent() throws InterruptedException {
                AuditEventBase ret = mQueue.poll();
 
-               try {
-                       while(ret == null && !mStopThread) {
-                               logSummaryIfRequired();
+               while(ret == null) {
+                       logSummaryIfRequired();
 
-                               if (mMaxFlushInterval > 0 && isFlushPending()) {
-                                       long timeTillNextFlush = 
getTimeTillNextFlush();
+                       if (mMaxFlushInterval > 0 && isFlushPending()) {
+                               long timeTillNextFlush = getTimeTillNextFlush();
 
-                                       if (timeTillNextFlush <= 0) {
-                                               break; // force flush
-                                       }
+                               if (timeTillNextFlush <= 0) {
+                                       break; // force flush
+                               }
 
-                                       ret = mQueue.poll(timeTillNextFlush, 
TimeUnit.MILLISECONDS);
-                               } else {
-                                       // Let's wake up for summary logging
-                                       long waitTime = intervalLogDurationMS - 
(System.currentTimeMillis() - lastIntervalLogTime);
-                                       waitTime = waitTime <= 0 ? 
intervalLogDurationMS : waitTime;
+                               ret = mQueue.poll(timeTillNextFlush, 
TimeUnit.MILLISECONDS);
+                       } else {
+                               // Let's wake up for summary logging
+                               long waitTime = intervalLogDurationMS - 
(System.currentTimeMillis() - lastIntervalLogTime);
+                               waitTime = waitTime <= 0 ? 
intervalLogDurationMS : waitTime;
 
-                                       ret = mQueue.poll(waitTime, 
TimeUnit.MILLISECONDS);
-                               }
+                               ret = mQueue.poll(waitTime, 
TimeUnit.MILLISECONDS);
                        }
-               } catch(InterruptedException excp) {
-                       LOG.error("AsyncAuditProvider.dequeueEvent()", excp);
                }
 
                if(ret != null) {

Reply via email to