RANGER-397 Support RDBMS as audit destination using V3 configuration

Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/a2de2450
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/a2de2450
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/a2de2450

Branch: refs/heads/tag-policy
Commit: a2de2450a572468af1928d5d021567c39544e193
Parents: 9e5bd85
Author: Don Bosco Durai <[email protected]>
Authored: Fri May 29 14:54:22 2015 -0700
Committer: Don Bosco Durai <[email protected]>
Committed: Fri May 29 14:54:22 2015 -0700

----------------------------------------------------------------------
 .../org/apache/ranger/audit/dao/DaoManager.java |   2 +
 .../audit/destination/DBAuditDestination.java   | 306 +++++++++++++++++++
 .../audit/destination/HDFSAuditDestination.java |   3 +
 .../audit/provider/AuditProviderFactory.java    |   3 +-
 .../ranger/audit/provider/BaseAuditHandler.java |   5 +-
 .../apache/ranger/audit/provider/MiscUtil.java  |  15 +
 .../ranger/audit/queue/AuditAsyncQueue.java     |  25 +-
 .../ranger/audit/queue/AuditBatchQueue.java     |  24 +-
 .../apache/ranger/audit/queue/AuditQueue.java   |   6 +
 .../ranger/audit/queue/AuditSummaryQueue.java   |  25 +-
 10 files changed, 409 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
index 6d81744..fd4d096 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java
@@ -49,6 +49,8 @@ public class DaoManager extends DaoManagerBase {
 
                                sEntityManager.set(em);
                        }
+               } else {
+                       logger.error("EntityManagerFactory was not set in this 
thread.", new Throwable());
                }
 
                return em;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
new file mode 100644
index 0000000..c58748e
--- /dev/null
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java
@@ -0,0 +1,306 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.ranger.audit.destination;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.persistence.EntityManager;
+import javax.persistence.EntityManagerFactory;
+import javax.persistence.EntityTransaction;
+import javax.persistence.Persistence;
+
+import org.apache.ranger.audit.dao.DaoManager;
+import org.apache.ranger.audit.model.AuditEventBase;
+import org.apache.ranger.audit.provider.MiscUtil;
+
+public class DBAuditDestination extends AuditDestination {
+
+       private static final Log logger = LogFactory
+                       .getLog(DBAuditDestination.class);
+
+       public static final String PROP_DB_JDBC_DRIVER = "jdbc.driver";
+       public static final String PROP_DB_JDBC_URL = "jdbc.url";
+       public static final String PROP_DB_USER = "user";
+       public static final String PROP_DB_PASSWORD = "password";
+       public static final String PROP_DB_PASSWORD_ALIAS = "password.alias";
+
+       private EntityManagerFactory entityManagerFactory;
+       private DaoManager daoManager;
+
+       private String jdbcDriver = null;
+       private String jdbcURL = null;
+       private String dbUser = null;
+       private String dbPasswordAlias = "auditDBCred";
+
+       public DBAuditDestination() {
+               logger.info("DBAuditDestination() called");
+       }
+
+       @Override
+       public void init(Properties props, String propPrefix) {
+               logger.info("init() called");
+               super.init(props, propPrefix);
+
+               // Initial connect
+               connect();
+       }
+
+       /*
+        * (non-Javadoc)
+        * 
+        * @see
+        * 
org.apache.ranger.audit.provider.AuditHandler#logger(java.util.Collection
+        * )
+        */
+       @Override
+       public boolean log(Collection<AuditEventBase> events) {
+               boolean retValue = false;
+
+               if (!beginTransaction()) {
+                       return false;
+               }
+               boolean isFailed = false;
+               for (AuditEventBase event : events) {
+                       try {
+                               event.persist(daoManager);
+                       } catch (Throwable t) {
+                               logger.error("Error persisting data. event=" + 
event, t);
+                               isFailed = true;
+                               break;
+                       }
+               }
+               if (isFailed) {
+                       retValue = false;
+                       rollbackTransaction();
+               } else {
+                       retValue = commitTransaction();
+               }
+               return retValue;
+       }
+
+       @Override
+       public void stop() {
+               cleanUp();
+               super.stop();
+       }
+
+       // Local methods
+       protected void connect() {
+               if (isDbConnected()) {
+                       return;
+               }
+               try {
+                       jdbcDriver = MiscUtil.getStringProperty(props, 
propPrefix + "."
+                                       + PROP_DB_JDBC_DRIVER);
+                       jdbcURL = MiscUtil.getStringProperty(props, propPrefix 
+ "."
+                                       + PROP_DB_JDBC_URL);
+                       dbUser = MiscUtil.getStringProperty(props, propPrefix + 
"."
+                                       + PROP_DB_USER);
+                       String dbPassword = MiscUtil.getStringProperty(props, 
propPrefix
+                                       + "." + PROP_DB_PASSWORD);
+                       String tmpAlias = MiscUtil.getStringProperty(props, 
propPrefix
+                                       + "." + PROP_DB_PASSWORD_ALIAS);
+                       dbPasswordAlias = tmpAlias != null ? tmpAlias : 
dbPasswordAlias;
+                       String credFile = MiscUtil.getStringProperty(props,
+                                       AUDIT_DB_CREDENTIAL_PROVIDER_FILE);
+
+                       if (jdbcDriver == null || jdbcDriver.isEmpty()) {
+                               logger.fatal("JDBC driver not provided. Set 
property name "
+                                               + propPrefix + "." + 
PROP_DB_JDBC_DRIVER);
+                               return;
+                       }
+                       if (jdbcURL == null || jdbcURL.isEmpty()) {
+                               logger.fatal("JDBC URL not provided. Set 
property name "
+                                               + propPrefix + "." + 
PROP_DB_JDBC_URL);
+                               return;
+                       }
+                       if (dbUser == null || dbUser.isEmpty()) {
+                               logger.fatal("DB user not provided. Set 
property name "
+                                               + propPrefix + "." + 
PROP_DB_USER);
+                               return;
+                       }
+                       if (dbPassword == null || dbPassword.isEmpty()) {
+                               logger.warn("DB password not provided. Will 
assume empty for now. Set property name "
+                                               + propPrefix + "." + 
PROP_DB_PASSWORD);
+                       } else {
+                               dbPassword = 
MiscUtil.getCredentialString(credFile,
+                                               dbPasswordAlias);
+                       }
+                       logger.info("JDBC Driver=" + jdbcDriver + ", JDBC URL=" 
+ jdbcURL
+                                       + ", dbUser=" + dbUser + ", 
passwordAlias="
+                                       + dbPasswordAlias + ", credFile=" + 
credFile);
+
+                       Map<String, String> dbProperties = new HashMap<String, 
String>();
+                       dbProperties.put("javax.persistence.jdbc.driver", 
jdbcDriver);
+                       dbProperties.put("javax.persistence.jdbc.url", jdbcURL);
+                       dbProperties.put("javax.persistence.jdbc.user", dbUser);
+                       if (dbPassword != null) {
+                               
dbProperties.put("javax.persistence.jdbc.password", dbPassword);
+                       }
+
+                       entityManagerFactory = 
Persistence.createEntityManagerFactory(
+                                       "xa_server", dbProperties);
+
+                       logger.info("entityManagerFactory=" + 
entityManagerFactory);
+
+                       daoManager = new DaoManager();
+                       
daoManager.setEntityManagerFactory(entityManagerFactory);
+
+                       // this forces the connection to be made to DB
+                       if (daoManager.getEntityManager() != null) {
+                               logger.error("Error connecting audit database. 
EntityManager is null. dbURL="
+                                               + jdbcURL + ", dbUser=" + 
dbUser);
+                       }
+
+               } catch (Throwable t) {
+                       logger.error("Error connecting audit database. dbURL=" 
+ jdbcURL
+                                       + ", dbUser=" + dbUser, t);
+               }
+       }
+
+       private synchronized void cleanUp() {
+               logger.info("DBAuditDestination: cleanUp()");
+
+               try {
+                       if (entityManagerFactory != null && 
entityManagerFactory.isOpen()) {
+                               entityManagerFactory.close();
+                       }
+               } catch (Exception excp) {
+                       logger.error("DBAuditDestination.cleanUp(): failed", 
excp);
+               } finally {
+                       entityManagerFactory = null;
+                       daoManager = null;
+               }
+       }
+
+       private EntityManager getEntityManager() {
+               DaoManager daoMgr = daoManager;
+
+               if (daoMgr != null) {
+                       try {
+                               return daoMgr.getEntityManager();
+                       } catch (Exception excp) {
+                               
logger.error("DBAuditDestination.getEntityManager(): failed",
+                                               excp);
+
+                               cleanUp();
+                       }
+               }
+
+               return null;
+       }
+
+       private boolean isDbConnected() {
+               EntityManager em = getEntityManager();
+               return em != null && em.isOpen();
+       }
+
+       private void clearEntityManager() {
+               try {
+                       EntityManager em = getEntityManager();
+
+                       if (em != null) {
+                               em.clear();
+                       }
+               } catch (Exception excp) {
+                       logger.warn("DBAuditDestination.clearEntityManager(): 
failed", excp);
+               }
+       }
+
+       private EntityTransaction getTransaction() {
+               if (!isDbConnected()) {
+                       connect();
+               }
+
+               EntityManager em = getEntityManager();
+
+               return em != null ? em.getTransaction() : null;
+       }
+
+       private boolean beginTransaction() {
+               EntityTransaction trx = getTransaction();
+
+               if (trx != null && !trx.isActive()) {
+                       trx.begin();
+               }
+
+               if (trx == null) {
+                       logger.warn("DBAuditDestination.beginTransaction(): trx 
is null");
+               }
+
+               return trx != null;
+       }
+
+       private boolean commitTransaction() {
+               boolean ret = false;
+               EntityTransaction trx = null;
+
+               try {
+                       trx = getTransaction();
+
+                       if (trx != null && trx.isActive()) {
+                               trx.commit();
+                               ret = true;
+                       } else {
+                               throw new Exception("trx is null or not 
active");
+                       }
+               } catch (Throwable excp) {
+                       logger.error("DBAuditDestination.commitTransaction(): 
failed", excp);
+
+                       cleanUp(); // so that next insert will try to init()
+               } finally {
+                       clearEntityManager();
+               }
+
+               return ret;
+       }
+
+       private boolean rollbackTransaction() {
+               boolean ret = false;
+               EntityTransaction trx = null;
+
+               try {
+                       trx = getTransaction();
+
+                       if (trx != null && trx.isActive()) {
+                               trx.rollback();
+                               ret = true;
+                       } else {
+                               throw new Exception("trx is null or not 
active");
+                       }
+               } catch (Throwable excp) {
+                       logger.error("DBAuditDestination.rollbackTransaction(): 
failed",
+                                       excp);
+
+                       cleanUp(); // so that next insert will try to init()
+               } finally {
+                       clearEntityManager();
+               }
+
+               return ret;
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index 6ca4fce..67382a9 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -105,6 +105,9 @@ public class HDFSAuditDestination extends AuditDestination {
 
        @Override
        synchronized public boolean logJSON(Collection<String> events) {
+               if (!initDone) {
+                       return false;
+               }
                if (isStopped) {
                        logError("log() called after stop was requested. name=" 
+ getName());
                        return false;

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
index d6ef318..c3a05ce 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java
@@ -24,6 +24,7 @@ import java.util.Properties;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.ranger.audit.destination.DBAuditDestination;
 import org.apache.ranger.audit.destination.FileAuditDestination;
 import org.apache.ranger.audit.destination.HDFSAuditDestination;
 import org.apache.ranger.audit.destination.SolrAuditDestination;
@@ -415,7 +416,7 @@ public class AuditProviderFactory {
                        } else if (providerName.equals("kafka")) {
                                provider = new KafkaAuditProvider();
                        } else if (providerName.equals("db")) {
-                               provider = new DbAuditProvider();
+                               provider = new DBAuditDestination();
                        } else if (providerName.equals("log4j")) {
                                provider = new Log4jAuditProvider();
                        } else if (providerName.equals("batch")) {

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
index dd44def..09335c7 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.model.AuthzAuditEvent;
+
 import com.google.gson.GsonBuilder;
 
 import java.util.concurrent.atomic.AtomicLong;
@@ -33,7 +34,9 @@ import java.util.Properties;
 public abstract class BaseAuditHandler implements AuditHandler {
        private static final Log LOG = 
LogFactory.getLog(BaseAuditHandler.class);
 
-       private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP 
= "xasecure.audit.log.failure.report.min.interval.ms";
+       static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = 
"xasecure.audit.log.failure.report.min.interval.ms";
+       protected static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE   = 
"xasecure.audit.credential.provider.file";
+
 
        private int mLogFailureReportMinIntervalInMs = 60 * 1000;
 

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
index fe6b0e9..abb0a90 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java
@@ -29,6 +29,7 @@ import java.util.StringTokenizer;
 import java.util.UUID;
 
 import org.apache.log4j.helpers.LogLog;
+import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider;
 
 import com.google.gson.Gson;
 import com.google.gson.GsonBuilder;
@@ -377,5 +378,19 @@ public class MiscUtil {
                }
                return list;
        }
+       
+       public static String getCredentialString(String url,String alias) {
+               String ret = null;
+
+               if(url != null && alias != null) {
+                       char[] cred = 
RangerCredentialProvider.getInstance().getCredentialString(url,alias);
+
+                       if ( cred != null ) {
+                               ret = new String(cred); 
+                       }
+               }
+               
+               return ret;
+       }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
index d16fff9..de5941a 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java
@@ -102,9 +102,16 @@ public class AuditAsyncQueue extends AuditQueue implements 
Runnable {
         */
        @Override
        public void stop() {
+               logger.info("Stop called. name=" + getName());
+               if (stopTime != 0) {
+                       stopTime = System.currentTimeMillis();
+               }
                setDrain(true);
                try {
                        if (consumerThread != null) {
+                               logger.info("Interrupting consumerThread. 
name=" + getName()
+                                               + ", consumer="
+                                               + (consumer == null ? null : 
consumer.getName()));
                                consumerThread.interrupt();
                        }
                } catch (Throwable t) {
@@ -138,7 +145,7 @@ public class AuditAsyncQueue extends AuditQueue implements 
Runnable {
                                }
                        } catch (InterruptedException e) {
                                logger.info(
-                                               "Caught exception in consumer 
thread. Mostly to about loop",
+                                               "Caught exception in consumer 
thread. Mostly server is shutting down.",
                                                e);
                        } catch (Throwable t) {
                                logger.error("Caught error during processing 
request.", t);
@@ -146,13 +153,29 @@ public class AuditAsyncQueue extends AuditQueue 
implements Runnable {
                        if (isDrain() && queue.isEmpty()) {
                                break;
                        }
+                       if (isDrain()
+                                       && (stopTime - 
System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
+                               logger.warn("Exiting polling loop to max time 
allowed. name="
+                                               + getName() + ", waited for "
+                                               + (stopTime - 
System.currentTimeMillis()) + " ms");
+
+                               break;
+                       }
                }
+               logger.info("Exiting polling loop. name=" + getName());
+
                try {
                        // Call stop on the consumer
+                       logger.info("Calling to stop consumer. name=" + 
getName()
+                                       + ", consumer.name=" + 
consumer.getName());
+
+                       // Call stop on the consumer
                        consumer.stop();
                } catch (Throwable t) {
                        logger.error("Error while calling stop on consumer.", 
t);
                }
+               logger.info("Exiting consumerThread.run() method. name=" + 
getName());
+
        }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index 8316c2b..645483b 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -119,10 +119,19 @@ public class AuditBatchQueue extends AuditQueue 
implements Runnable {
         */
        @Override
        public void stop() {
+               logger.info("Stop called. name=" + getName());
+               if (stopTime != 0) {
+                       stopTime = System.currentTimeMillis();
+               }
+
                setDrain(true);
                flush();
                try {
                        if (consumerThread != null) {
+                               logger.info("Interrupting consumerThread. 
name=" + getName()
+                                               + ", consumer="
+                                               + (consumer == null ? null : 
consumer.getName()));
+
                                consumerThread.interrupt();
                        }
                } catch (Throwable t) {
@@ -257,7 +266,7 @@ public class AuditBatchQueue extends AuditQueue implements 
Runnable {
                                }
                        } catch (InterruptedException e) {
                                logger.info(
-                                               "Caught exception in consumer 
thread. Mostly to abort loop",
+                                               "Caught exception in consumer 
thread. Mostly server is shutting down.",
                                                e);
                                setDrain(true);
                        } catch (Throwable t) {
@@ -311,12 +320,24 @@ public class AuditBatchQueue extends AuditQueue 
implements Runnable {
                                        break;
                                }
                        }
+                       if (isDrain()
+                                       && (stopTime - 
System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
+                               logger.warn("Exiting polling loop to max time 
allowed. name="
+                                               + getName() + ", waited for "
+                                               + (stopTime - 
System.currentTimeMillis()) + " ms");
+
+                               break;
+                       }
+
                }
 
                logger.info("Exiting consumerThread. Queue=" + getName() + ", 
dest="
                                + consumer.getName());
                try {
                        // Call stop on the consumer
+                       logger.info("Calling to stop consumer. name=" + 
getName()
+                                       + ", consumer.name=" + 
consumer.getName());
+
                        consumer.stop();
                        if (fileSpoolerEnabled) {
                                fileSpooler.stop();
@@ -324,5 +345,6 @@ public class AuditBatchQueue extends AuditQueue implements 
Runnable {
                } catch (Throwable t) {
                        logger.error("Error while calling stop on consumer.", 
t);
                }
+               logger.info("Exiting consumerThread.run() method. name=" + 
getName());
        }
 }

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
index 4c3ac5f..039dc6d 100644
--- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
+++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java
@@ -33,6 +33,9 @@ public abstract class AuditQueue extends BaseAuditHandler {
        public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024;
        public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000;
        public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000;
+       
+       //This is the max time the consumer thread will wait before exiting the 
loop 
+       public static final int AUDIT_CONSUMER_THREAD_WAIT_MS = 5000;
 
        private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT;
        private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS;
@@ -57,6 +60,9 @@ public abstract class AuditQueue extends BaseAuditHandler {
        protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes
        protected int fileSpoolDrainThresholdPercent = 80;
 
+       //This is set when the first time stop is called.
+       protected long stopTime = 0;
+       
        /**
         * @param consumer
         */

http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
----------------------------------------------------------------------
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
index 7922312..1e5b500 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java
@@ -122,9 +122,18 @@ public class AuditSummaryQueue extends AuditQueue 
implements Runnable {
         */
        @Override
        public void stop() {
+               logger.info("Stop called. name=" + getName());
+               if (stopTime != 0) {
+                       stopTime = System.currentTimeMillis();
+               }
+
                setDrain(true);
                try {
                        if (consumerThread != null) {
+                               logger.info("Interrupting consumerThread. 
name=" + getName()
+                                               + ", consumer="
+                                               + (consumer == null ? null : 
consumer.getName()));
+
                                consumerThread.interrupt();
                        }
                } catch (Throwable t) {
@@ -170,7 +179,7 @@ public class AuditSummaryQueue extends AuditQueue 
implements Runnable {
                                }
                        } catch (InterruptedException e) {
                                logger.info(
-                                               "Caught exception in consumer 
thread. Mostly to about loop",
+                                               "Caught exception in consumer 
thread. Mostly server is shutting down.",
                                                e);
                        } catch (Throwable t) {
                                logger.error("Caught error during processing 
request.", t);
@@ -217,14 +226,28 @@ public class AuditSummaryQueue extends AuditQueue 
implements Runnable {
                        if (isDrain() && summaryMap.isEmpty() && 
queue.isEmpty()) {
                                break;
                        }
+                       if (isDrain()
+                                       && (stopTime - 
System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) {
+                               logger.warn("Exiting polling loop to max time 
allowed. name="
+                                               + getName() + ", waited for "
+                                               + (stopTime - 
System.currentTimeMillis()) + " ms");
+
+                               break;
+                       }
+
                }
 
+               logger.info("Exiting polling loop. name=" + getName());
                try {
                        // Call stop on the consumer
+                       logger.info("Calling to stop consumer. name=" + 
getName()
+                                       + ", consumer.name=" + 
consumer.getName());
                        consumer.stop();
                } catch (Throwable t) {
                        logger.error("Error while calling stop on consumer.", 
t);
                }
+               logger.info("Exiting consumerThread.run() method. name=" + 
getName());
+
        }
 
        class AuditSummary {

Reply via email to