RANGER-627 Add start/stop/progress log messages so processing of Audit's JVM shutdown hooks can be monitored
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/0ed5a161 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/0ed5a161 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/0ed5a161 Branch: refs/heads/HDP-2.3.2-groupid Commit: 0ed5a161180755f3fa3ccc523137630bb4a7b2d9 Parents: 6987d30 Author: Alok Lal <[email protected]> Authored: Fri Aug 28 18:19:56 2015 -0700 Committer: Alok Lal <[email protected]> Committed: Fri Sep 11 16:02:17 2015 -0700 ---------------------------------------------------------------------- .../audit/provider/AsyncAuditProvider.java | 48 ++++++++++++++------ .../audit/provider/AuditProviderFactory.java | 9 +++- .../ranger/audit/provider/DbAuditProvider.java | 15 +----- .../audit/provider/MultiDestAuditProvider.java | 10 ++-- 4 files changed, 47 insertions(+), 35 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java index f469d80..446ef95 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java @@ -42,6 +42,9 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements private int mMaxQueueSize = 10 * 1024; private int mMaxFlushInterval = 5000; // 5 seconds + private static final int mStopLoopIntervalSecs = 1; // 1 second + private static final int mWaitToCompleteLoopIntervalSecs = 1; // 1 second + // Summary of logs handled private AtomicLong lifeTimeInLogCount = new AtomicLong(0); // Total count, including drop count private AtomicLong lifeTimeOutLogCount = new AtomicLong(0); @@ -110,15 +113,24 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements @Override public void stop() { - mThread.interrupt(); - + LOG.info("==> AsyncAuditProvider.stop()"); try { - mThread.join(); - } catch (InterruptedException excp) { - LOG.error("AsyncAuditProvider.stop(): failed while waiting for thread to exit", excp); - } + LOG.info("Interrupting child thread of " + mName + "..." ); + mThread.interrupt(); + while (mThread.isAlive()) { + try { + LOG.info(String.format("Waiting for child thread of %s to exit. Sleeping for %d secs", mName, mStopLoopIntervalSecs)); + mThread.join(mStopLoopIntervalSecs * 1000); + } catch (InterruptedException e) { + LOG.warn("Interrupted while waiting for child thread to join! Proceeding with stop", e); + break; + } + } - super.stop(); + super.stop(); + } finally { + LOG.info("<== AsyncAuditProvider.stop()"); + } } @Override @@ -144,6 +156,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements flush(); } } catch (InterruptedException excp) { + LOG.info("AsyncAuditProvider.run - Interrupted! Breaking out of while loop."); break; } catch (Exception excp) { logFailedEvent(event, excp); @@ -237,16 +250,21 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements public void waitToComplete(long maxWaitSeconds) { LOG.debug("==> AsyncAuditProvider.waitToComplete()"); - for (long waitTime = 0; !isEmpty() - && (maxWaitSeconds <= 0 || maxWaitSeconds > waitTime); waitTime++) { - try { - Thread.sleep(1000); - } catch (Exception excp) { - // ignore + try { + for (long waitTime = 0; !isEmpty() + && (maxWaitSeconds <= 0 || maxWaitSeconds > waitTime); waitTime += mWaitToCompleteLoopIntervalSecs) { + try { + LOG.info(String.format("%d messages yet to be flushed by %s. Sleeoping for %d sec", mQueue.size(), mName, mWaitToCompleteLoopIntervalSecs)); + Thread.sleep(mWaitToCompleteLoopIntervalSecs * 1000); + } catch (InterruptedException excp) { + // someone really wants service to exit, abandon unwritten audits and exit. + LOG.warn("Caught interrupted exception! " + mQueue.size() + " messages still unflushed! Won't wait for queue to flush, exiting...", excp); + break; + } } + } finally { + LOG.debug("<== AsyncAuditProvider.waitToComplete()"); } - - LOG.debug("<== AsyncAuditProvider.waitToComplete()"); } private long getTimeTillNextFlush() { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index 1146e0b..723b528 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -451,8 +451,13 @@ public class AuditProviderFactory { } public void run() { - mProvider.waitToComplete(); - mProvider.stop(); + LOG.info("==> JVMShutdownHook.run()"); + try { + mProvider.waitToComplete(); + mProvider.stop(); + } finally { + LOG.info("<== JVMShutdownHook.run()"); + } } } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java index f23f17d..8319d36 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java @@ -106,7 +106,7 @@ public class DbAuditProvider extends AuditDestination { try { if(preCreate(event)) { DaoManager daoMgr = daoManager; - + if(daoMgr != null) { event.persist(daoMgr); @@ -120,6 +120,7 @@ public class DbAuditProvider extends AuditDestination { logFailedEvent(event); } } + LOG.debug("<== DbAuditProvider.log()"); return isSuccess; } @@ -167,18 +168,6 @@ public class DbAuditProvider extends AuditDestination { cleanUp(); } - - @Override - public void waitToComplete() { - LOG.info("DbAuditProvider.waitToComplete()"); - waitToComplete(-1); - } - - @Override - public void waitToComplete(long timeout) { - LOG.info("DbAuditProvider.waitToComplete():timeout=" + timeout); - - } @Override public void flush() { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/0ed5a161/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java index 26108ca..282f5ab 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java @@ -161,7 +161,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler { try { provider.start(); } catch (Throwable excp) { - LOG.error("AsyncAuditProvider.start(): failed for provider { " + LOG.error("MultiDestAuditProvider.start(): failed for provider { " + provider.getClass().getName() + " }", excp); } } @@ -173,7 +173,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler { try { provider.stop(); } catch (Throwable excp) { - LOG.error("AsyncAuditProvider.stop(): failed for provider { " + LOG.error("MultiDestAuditProvider.stop(): failed for provider { " + provider.getClass().getName() + " }", excp); } } @@ -186,7 +186,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler { provider.waitToComplete(); } catch (Throwable excp) { LOG.error( - "AsyncAuditProvider.waitToComplete(): failed for provider { " + "MultiDestAuditProvider.waitToComplete(): failed for provider { " + provider.getClass().getName() + " }", excp); } } @@ -199,7 +199,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler { provider.waitToComplete(timeout); } catch (Throwable excp) { LOG.error( - "AsyncAuditProvider.waitToComplete(): failed for provider { " + "MultiDestAuditProvider.waitToComplete(): failed for provider { " + provider.getClass().getName() + " }", excp); } } @@ -211,7 +211,7 @@ public class MultiDestAuditProvider extends BaseAuditHandler { try { provider.flush(); } catch (Throwable excp) { - LOG.error("AsyncAuditProvider.flush(): failed for provider { " + LOG.error("MultiDestAuditProvider.flush(): failed for provider { " + provider.getClass().getName() + " }", excp); } }
