RANGER-627 Add start/stop/progress log messages so processing of Audit's JVM 
shutdown hooks can be monitored


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/0ed5a161
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/0ed5a161
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/0ed5a161

Branch: refs/heads/HDP-2.3.2-groupid
Commit: 0ed5a161180755f3fa3ccc523137630bb4a7b2d9
Parents: 6987d30
Author: Alok Lal <[email protected]>
Authored: Fri Aug 28 18:19:56 2015 -0700
Committer: Alok Lal <[email protected]>
Committed: Fri Sep 11 16:02:17 2015 -0700

----------------------------------------------------------------------
 .../audit/provider/AsyncAuditProvider.java      | 48 ++++++++++++++------
 .../audit/provider/AuditProviderFactory.java    |  9 +++-
 .../ranger/audit/provider/DbAuditProvider.java  | 15 +-----
 .../audit/provider/MultiDestAuditProvider.java  | 10 ++--
 4 files changed, 47 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
index f469d80..446ef95 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java
@@ -42,6 +42,9 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
        private int     mMaxQueueSize     = 10 * 1024;
        private int     mMaxFlushInterval = 5000; // 5 seconds
 
+       private static final int mStopLoopIntervalSecs           = 1; // 1 
second
+       private static final int mWaitToCompleteLoopIntervalSecs = 1; // 1 
second
+
        // Summary of logs handled
        private AtomicLong lifeTimeInLogCount  = new AtomicLong(0); // Total 
count, including drop count
        private AtomicLong lifeTimeOutLogCount = new AtomicLong(0);
@@ -110,15 +113,24 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
 
        @Override
        public void stop() {
-               mThread.interrupt();
-
+               LOG.info("==> AsyncAuditProvider.stop()");
                try {
-                       mThread.join();
-               } catch (InterruptedException excp) {
-                       LOG.error("AsyncAuditProvider.stop(): failed while 
waiting for thread to exit", excp);
-               }
+                       LOG.info("Interrupting child thread of " + mName + 
"..." );
+                       mThread.interrupt();
+                       while (mThread.isAlive()) {
+                               try {
+                                       LOG.info(String.format("Waiting for 
child thread of %s to exit.  Sleeping for %d secs", mName, 
mStopLoopIntervalSecs));
+                                       mThread.join(mStopLoopIntervalSecs * 
1000);
+                               } catch (InterruptedException e) {
+                                       LOG.warn("Interrupted while waiting for 
child thread to join!  Proceeding with stop", e);
+                                       break;
+                               }
+                       }
 
-               super.stop();
+                       super.stop();
+               } finally {
+                       LOG.info("<== AsyncAuditProvider.stop()");
+               }
        }
 
        @Override
@@ -144,6 +156,7 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
                                        flush();
                                }
                        } catch (InterruptedException excp) {
+                               LOG.info("AsyncAuditProvider.run - Interrupted! 
 Breaking out of while loop.");
                                break;
                        } catch (Exception excp) {
                                logFailedEvent(event, excp);
@@ -237,16 +250,21 @@ public class AsyncAuditProvider extends 
MultiDestAuditProvider implements
        public void waitToComplete(long maxWaitSeconds) {
                LOG.debug("==> AsyncAuditProvider.waitToComplete()");
 
-               for (long waitTime = 0; !isEmpty()
-                               && (maxWaitSeconds <= 0 || maxWaitSeconds > 
waitTime); waitTime++) {
-                       try {
-                               Thread.sleep(1000);
-                       } catch (Exception excp) {
-                               // ignore
+               try {
+                       for (long waitTime = 0; !isEmpty()
+                                       && (maxWaitSeconds <= 0 || 
maxWaitSeconds > waitTime); waitTime += mWaitToCompleteLoopIntervalSecs) {
+                               try {
+                                       LOG.info(String.format("%d messages yet 
to be flushed by %s.  Sleeoping for %d sec", mQueue.size(), mName, 
mWaitToCompleteLoopIntervalSecs));
+                                       
Thread.sleep(mWaitToCompleteLoopIntervalSecs * 1000);
+                               } catch (InterruptedException excp) {
+                                       // someone really wants service to 
exit, abandon unwritten audits and exit.
+                                       LOG.warn("Caught interrupted exception! 
" + mQueue.size() + " messages still unflushed!  Won't wait for queue to flush, 
exiting...", excp);
+                                       break;
+                               }
                        }
+               } finally {
+                       LOG.debug("<== AsyncAuditProvider.waitToComplete()");
                }
-
-               LOG.debug("<== AsyncAuditProvider.waitToComplete()");
        }
 
        private long getTimeTillNextFlush() {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index 1146e0b..723b528 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -451,8 +451,13 @@ public class AuditProviderFactory {
                }
 
                public void run() {
-                       mProvider.waitToComplete();
-                       mProvider.stop();
+                       LOG.info("==> JVMShutdownHook.run()");
+                       try {
+                               mProvider.waitToComplete();
+                               mProvider.stop();
+                       } finally {
+                               LOG.info("<== JVMShutdownHook.run()");
+                       }
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
index f23f17d..8319d36 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java
@@ -106,7 +106,7 @@ public class DbAuditProvider extends AuditDestination {
                try {
                        if(preCreate(event)) {
                                DaoManager daoMgr = daoManager;
-       
+
                                if(daoMgr != null) {
                                        event.persist(daoMgr);
        
@@ -120,6 +120,7 @@ public class DbAuditProvider extends AuditDestination {
                                logFailedEvent(event);
                        }
                }
+               LOG.debug("<== DbAuditProvider.log()");
                return isSuccess;
        }
 
@@ -167,18 +168,6 @@ public class DbAuditProvider extends AuditDestination {
 
                cleanUp();
        }
-       
-       @Override
-    public void waitToComplete() {
-               LOG.info("DbAuditProvider.waitToComplete()");
-               waitToComplete(-1);
-       }
-
-       @Override
-       public void waitToComplete(long timeout) {
-               LOG.info("DbAuditProvider.waitToComplete():timeout=" + timeout);
-
-       }
 
        @Override
        public void flush() {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
index 26108ca..282f5ab 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java
@@ -161,7 +161,7 @@ public class MultiDestAuditProvider extends 
BaseAuditHandler {
                        try {
                                provider.start();
                        } catch (Throwable excp) {
-                               LOG.error("AsyncAuditProvider.start(): failed 
for provider { "
+                               LOG.error("MultiDestAuditProvider.start(): 
failed for provider { "
                                                + provider.getClass().getName() 
+ " }", excp);
                        }
                }
@@ -173,7 +173,7 @@ public class MultiDestAuditProvider extends 
BaseAuditHandler {
                        try {
                                provider.stop();
                        } catch (Throwable excp) {
-                               LOG.error("AsyncAuditProvider.stop(): failed 
for provider { "
+                               LOG.error("MultiDestAuditProvider.stop(): 
failed for provider { "
                                                + provider.getClass().getName() 
+ " }", excp);
                        }
                }
@@ -186,7 +186,7 @@ public class MultiDestAuditProvider extends 
BaseAuditHandler {
                                provider.waitToComplete();
                        } catch (Throwable excp) {
                                LOG.error(
-                                               
"AsyncAuditProvider.waitToComplete(): failed for provider { "
+                                               
"MultiDestAuditProvider.waitToComplete(): failed for provider { "
                                                                + 
provider.getClass().getName() + " }", excp);
                        }
                }
@@ -199,7 +199,7 @@ public class MultiDestAuditProvider extends 
BaseAuditHandler {
                                provider.waitToComplete(timeout);
                        } catch (Throwable excp) {
                                LOG.error(
-                                               
"AsyncAuditProvider.waitToComplete(): failed for provider { "
+                                               
"MultiDestAuditProvider.waitToComplete(): failed for provider { "
                                                                + 
provider.getClass().getName() + " }", excp);
                        }
                }
@@ -211,7 +211,7 @@ public class MultiDestAuditProvider extends 
BaseAuditHandler {
                        try {
                                provider.flush();
                        } catch (Throwable excp) {
-                               LOG.error("AsyncAuditProvider.flush(): failed 
for provider { "
+                               LOG.error("MultiDestAuditProvider.flush(): 
failed for provider { "
                                                + provider.getClass().getName() 
+ " }", excp);
                        }
                }

Reply via email to