Repository: incubator-ranger Updated Branches: refs/heads/master 9475c0f1f -> 3634faa12
RANGER-821 Ranger shutdown hook should not only do its processing asynchronously but also terminate itself if it runs more than a configurable amount of time. Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/3634faa1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/3634faa1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/3634faa1 Branch: refs/heads/master Commit: 3634faa1253bbb3298e61630defbaf76d032dff6 Parents: 9475c0f Author: Alok Lal <[email protected]> Authored: Wed Jan 20 17:43:05 2016 -0800 Committer: Alok Lal <[email protected]> Committed: Tue Jan 26 11:49:37 2016 -0800 ---------------------------------------------------------------------- .../audit/provider/AuditProviderFactory.java | 79 +++++++++++++++++--- 1 file changed, 68 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/3634faa1/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index 723b528..cb998cd 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -21,6 +21,8 @@ package org.apache.ranger.audit.provider; import java.util.ArrayList; import java.util.List; import java.util.Properties; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -56,6 +58,8 @@ public class AuditProviderFactory { public static final String AUDIT_SOLR_IS_ENABLED_PROP = "xasecure.audit.solr.is.enabled"; public static final String AUDIT_DEST_BASE = "xasecure.audit.destination"; + public static final String AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC = "xasecure.audit.shutdown.hook.max.wait.seconds"; + public static final int AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC_DEFAULT = 30; public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024; public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000; @@ -381,9 +385,7 @@ public class AuditProviderFactory { mProvider.start(); } - JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(mProvider); - - Runtime.getRuntime().addShutdownHook(jvmShutdownHook); + installJvmSutdownHook(props); } private AuditHandler getProviderFromConfig(Properties props, @@ -443,21 +445,76 @@ public class AuditProviderFactory { return new DummyAuditProvider(); } - private static class JVMShutdownHook extends Thread { - AuditHandler mProvider; + private void installJvmSutdownHook(Properties props) { + int shutdownHookMaxWaitSeconds = MiscUtil.getIntProperty(props, AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC, AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC_DEFAULT); + JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(mProvider, shutdownHookMaxWaitSeconds); + Runtime.getRuntime().addShutdownHook(jvmShutdownHook); + } + + private static class RangerAsyncAuditCleanup implements Runnable { - public JVMShutdownHook(AuditHandler provider) { - mProvider = provider; + final Semaphore startCleanup; + final Semaphore doneCleanup; + final AuditHandler mProvider; + + RangerAsyncAuditCleanup(AuditHandler provider, Semaphore startCleanup, Semaphore doneCleanup) { + this.startCleanup = startCleanup; + this.doneCleanup = doneCleanup; + this.mProvider = provider; } + @Override public void run() { - LOG.info("==> JVMShutdownHook.run()"); - try { + while (true) { + LOG.info("RangerAsyncAuditCleanup: Waiting to audit cleanup start signal"); + try { + startCleanup.acquire(); + } catch (InterruptedException e) { + LOG.info("RangerAsyncAuditCleanup: Interrupted while waiting for audit startCleanup signal! Exiting the thread...", e); + break; + } + LOG.info("RangerAsyncAuditCleanup: Starting cleanup"); mProvider.waitToComplete(); mProvider.stop(); - } finally { - LOG.info("<== JVMShutdownHook.run()"); + doneCleanup.release(); + LOG.info("RangerAsyncAuditCleanup: Done cleanup"); + } + } + } + + private static class JVMShutdownHook extends Thread { + final Semaphore startCleanup = new Semaphore(0); + final Semaphore doneCleanup = new Semaphore(0); + final Thread cleanupThread; + final int maxWait; + + public JVMShutdownHook(AuditHandler provider, int maxWait) { + this.maxWait = maxWait; + Runnable runnable = new RangerAsyncAuditCleanup(provider, startCleanup, doneCleanup); + cleanupThread = new Thread(runnable, "Ranger async Audit cleanup"); + cleanupThread.setDaemon(true); + cleanupThread.start(); + } + + public void run() { + LOG.info("==> JVMShutdownHook.run()"); + LOG.info("JVMShutdownHook: Signalling async audit cleanup to start."); + startCleanup.release(); + try { + Long start = System.currentTimeMillis(); + LOG.info("JVMShutdownHook: Waiting up to " + maxWait + " seconds for audit cleanup to finish."); + boolean cleanupFinishedInTime = doneCleanup.tryAcquire(maxWait, TimeUnit.SECONDS); + if (cleanupFinishedInTime) { + LOG.info("JVMShutdownHook: Audit cleanup finished after " + (System.currentTimeMillis() - start) + " milli seconds"); + } else { + LOG.warn("JVMShutdownHook: could not detect finishing of audit cleanup even after waiting for " + maxWait + " seconds!"); + } + } catch (InterruptedException e) { + LOG.info("JVMShutdownHook: Interrupted while waiting for completion of Async executor!", e); } + LOG.info("JVMShutdownHook: Interrupting ranger async audit cleanup thread"); + cleanupThread.interrupt(); + LOG.info("<== JVMShutdownHook.run()"); } } }
