Repository: incubator-ranger
Updated Branches:
  refs/heads/master 7f451e62b -> fe0858a6c
  refs/heads/ranger-0.5 13ad49655 -> 7137722f4


RANGER-821 Ranger shutdown hook should not only do its processing 
asynchronously but also terminate itself if it runs more than a configurable 
amount of time.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/7137722f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/7137722f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/7137722f

Branch: refs/heads/ranger-0.5
Commit: 7137722f48fb14c2eb47874ed8a71c18b0679117
Parents: 13ad496
Author: Alok Lal <[email protected]>
Authored: Wed Jan 20 17:43:05 2016 -0800
Committer: Alok Lal <[email protected]>
Committed: Mon Jan 25 14:28:22 2016 -0800

----------------------------------------------------------------------
 .../audit/provider/AuditProviderFactory.java    | 78 +++++++++++++++++---
 1 file changed, 67 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/7137722f/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index 723b528..bc59897 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -21,6 +21,8 @@ package org.apache.ranger.audit.provider;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Properties;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -56,6 +58,8 @@ public class AuditProviderFactory {
        public static final String AUDIT_SOLR_IS_ENABLED_PROP = 
"xasecure.audit.solr.is.enabled";
 
        public static final String AUDIT_DEST_BASE = 
"xasecure.audit.destination";
+       public static final String AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC = 
"xasecure.audit.shutdown.hook.max.wait.seconds";
+       public static final int AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC_DEFAULT = 30;
 
        public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024;
        public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 
1000;
@@ -381,9 +385,7 @@ public class AuditProviderFactory {
                        mProvider.start();
                }
 
-               JVMShutdownHook jvmShutdownHook = new 
JVMShutdownHook(mProvider);
-
-               Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+               installJvmSutdownHook(props);
        }
 
        private AuditHandler getProviderFromConfig(Properties props,
@@ -443,21 +445,75 @@ public class AuditProviderFactory {
                return new DummyAuditProvider();
        }
 
-       private static class JVMShutdownHook extends Thread {
-               AuditHandler mProvider;
+       private void installJvmSutdownHook(Properties props) {
+               int shutdownHookMaxWaitSeconds = MiscUtil.getIntProperty(props, 
AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC, AUDIT_SHUTDOWN_HOOK_MAX_WAIT_SEC_DEFAULT);
+               JVMShutdownHook jvmShutdownHook = new 
JVMShutdownHook(mProvider, shutdownHookMaxWaitSeconds);
+               Runtime.getRuntime().addShutdownHook(jvmShutdownHook);
+       }
+
+       private static class RangerAsyncAuditCleanup implements Runnable {
 
-               public JVMShutdownHook(AuditHandler provider) {
-                       mProvider = provider;
+               final Semaphore startCleanup;
+               final Semaphore doneCleanup;
+               final AuditHandler mProvider;
+
+               RangerAsyncAuditCleanup(AuditHandler provider, Semaphore 
startCleanup, Semaphore doneCleanup) {
+                       this.startCleanup = startCleanup;
+                       this.doneCleanup = doneCleanup;
+                       this.mProvider = provider;
                }
 
+               @Override
                public void run() {
-                       LOG.info("==> JVMShutdownHook.run()");
-                       try {
+                       while (true) {
+                               LOG.info("RangerAsyncAuditCleanup: Waiting to 
audit cleanup start signal");
+                               try {
+                                       startCleanup.acquire();
+                               } catch (InterruptedException e) {
+                                       LOG.info("RangerAsyncAuditCleanup: 
Interrupted while waiting for audit startCleanup signal!  Exiting the 
thread...", e);
+                                       break;
+                               }
+                               LOG.info("RangerAsyncAuditCleanup: Starting 
cleanup");
                                mProvider.waitToComplete();
                                mProvider.stop();
-                       } finally {
-                               LOG.info("<== JVMShutdownHook.run()");
+                               doneCleanup.release();
+                               LOG.info("RangerAsyncAuditCleanup: Done 
cleanup");
+                       }
+               }
+       }
+
+       private static class JVMShutdownHook extends Thread {
+               final Semaphore startCleanup = new Semaphore(0);
+               final Semaphore doneCleanup = new Semaphore(0);
+               final Thread cleanupThread;
+               final int maxWait;
+
+               public JVMShutdownHook(AuditHandler provider, int maxWait) {
+                       this.maxWait = maxWait;
+                       Runnable runnable = new 
RangerAsyncAuditCleanup(provider, startCleanup, doneCleanup);
+                       cleanupThread = new Thread(runnable, "Ranger async 
Audit cleanup");
+                       cleanupThread.start();
+               }
+
+               public void run() {
+                       LOG.info("==> JVMShutdownHook.run()");
+                       LOG.info("JVMShutdownHook: Signalling async audit 
cleanup to start.");
+                       startCleanup.release();
+                       try {
+                               Long start = System.currentTimeMillis();
+                               LOG.info("JVMShutdownHook: Waiting up to " + 
maxWait + " seconds for audit cleanup to finish.");
+                               boolean cleanupFinishedInTime = 
doneCleanup.tryAcquire(maxWait, TimeUnit.SECONDS);
+                               if (cleanupFinishedInTime) {
+                                       LOG.info("JVMShutdownHook: Audit 
cleanup finished after " + (System.currentTimeMillis() - start) + " milli 
seconds");
+                               } else {
+                                       LOG.warn("JVMShutdownHook: could not 
detect finishing of audit cleanup even after waiting for " + maxWait + " 
seconds!");
+                               }
+                       } catch (InterruptedException e) {
+                               LOG.info("JVMShutdownHook: Interrupted while 
waiting for completion of Async executor!", e);
                        }
+                       LOG.info("JVMShutdownHook: Interrupting ranger async 
audit cleanup thread");
+                       cleanupThread.interrupt();
+                       LOG.info("<== JVMShutdownHook.run()");
                }
        }
 }

Reply via email to