Repository: incubator-ranger Updated Branches: refs/heads/master 7f451e62b -> fe0858a6c refs/heads/ranger-0.5 13ad49655 -> 7137722f4
RANGER-821 Ranger shutdown hook should not only do its processing asynchronously but also terminate itself if it runs more than a configurable amount of time. Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/7137722f Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/7137722f Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/7137722f Branch: refs/heads/ranger-0.5 Commit: 7137722f48fb14c2eb47874ed8a71c18b0679117 Parents: 13ad496 Author: Alok Lal <[email protected]> Authored: Wed Jan 20 17:43:05 2016 -0800 Committer: Alok Lal <[email protected]> Committed: Mon Jan 25 14:28:22 2016 -0800 ---------------------------------------------------------------------- .../audit/provider/AuditProviderFactory.java | 78 +++++++++++++++++--- 1 file changed, 67 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7137722f/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index 723b528..bc59897 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -21,6 +21,8 @@ package org.apache.ranger.audit.provider; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,6 +58,8 @@ public class AuditProviderFactory { public static final String AUDIT_SOLR_IS_ENABLED_PROP = "xasecure.audit.solr.is.enabled"; public static final String AUDIT_DEST_BASE = "xasecure.audit.destination"; + public static final String AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC = "xasecure.audit.shutdown.hook.max.wait.seconds"; + public static final int AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC_DEFAULT = 30; public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024; public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000; @@ -381,9 +385,7 @@ public class AuditProviderFactory { mProvider.start(); } - JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(mProvider); - - Runtime.getRuntime().addShutdownHook(jvmShutdownHook); + installJvmSutdownHook(props); } private AuditHandler getProviderFromConfig(Properties props, @@ -443,21 +445,75 @@ public class AuditProviderFactory { return new DummyAuditProvider(); } - private static class JVMShutdownHook extends Thread { - AuditHandler mProvider; + private void installJvmSutdownHook(Properties props) { + int shutdownHookMaxWaitSeconds = MiscUtil.getIntProperty(props, AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC, AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC_DEFAULT); + JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(mProvider, shutdownHookMaxWaitSeconds); + Runtime.getRuntime().addShutdownHook(jvmShutdownHook); + } + + private static class RangerAsyncAuditCleanup implements Runnable { - public JVMShutdownHook(AuditHandler provider) { - mProvider = provider; + final Semaphore startCleanup; + final Semaphore doneCleanup; + final AuditHandler mProvider; + + RangerAsyncAuditCleanup(AuditHandler provider, Semaphore startCleanup, Semaphore doneCleanup) { + this.startCleanup = startCleanup; + this.doneCleanup = doneCleanup; + this.mProvider = provider; } + @Override public void run() { - LOG.info("==> JVMShutdownHook.run()"); - try { + while (true) { + LOG.info("RangerAsyncAuditCleanup: Waiting to audit cleanup start signal"); + try { + startCleanup.acquire(); + } catch (InterruptedException e) { + LOG.info("RangerAsyncAuditCleanup: Interrupted while waiting for audit startCleanup signal! Exiting the thread...", e); + break; + } + LOG.info("RangerAsyncAuditCleanup: Starting cleanup"); mProvider.waitToComplete(); mProvider.stop(); - } finally { - LOG.info("<== JVMShutdownHook.run()"); + doneCleanup.release(); + LOG.info("RangerAsyncAuditCleanup: Done cleanup"); + } + } + } + + private static class JVMShutdownHook extends Thread { + final Semaphore startCleanup = new Semaphore(0); + final Semaphore doneCleanup = new Semaphore(0); + final Thread cleanupThread; + final int maxWait; + + public JVMShutdownHook(AuditHandler provider, int maxWait) { + this.maxWait = maxWait; + Runnable runnable = new RangerAsyncAuditCleanup(provider, startCleanup, doneCleanup); + cleanupThread = new Thread(runnable, "Ranger async Audit cleanup"); + cleanupThread.start(); + } + + public void run() { + LOG.info("==> JVMShutdownHook.run()"); + LOG.info("JVMShutdownHook: Signalling async audit cleanup to start."); + startCleanup.release(); + try { + Long start = System.currentTimeMillis(); + LOG.info("JVMShutdownHook: Waiting up to " + maxWait + " seconds for audit cleanup to finish."); + boolean cleanupFinishedInTime = doneCleanup.tryAcquire(maxWait, TimeUnit.SECONDS); + if (cleanupFinishedInTime) { + LOG.info("JVMShutdownHook: Audit cleanup finished after " + (System.currentTimeMillis() - start) + " milli seconds"); + } else { + LOG.warn("JVMShutdownHook: could not detect finishing of audit cleanup even after waiting for " + maxWait + " seconds!"); + } + } catch (InterruptedException e) { + LOG.info("JVMShutdownHook: Interrupted while waiting for completion of Async executor!", e); } + LOG.info("JVMShutdownHook: Interrupting ranger async audit cleanup thread"); + cleanupThread.interrupt(); + LOG.info("<== JVMShutdownHook.run()"); } } }
