Repository: incubator-ranger Updated Branches: refs/heads/master 9e5bd8540 -> 94ba6beb3
RANGER-397 Support RDBMS as audit destination using V3 configuration Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/a2de2450 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/a2de2450 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/a2de2450 Branch: refs/heads/master Commit: a2de2450a572468af1928d5d021567c39544e193 Parents: 9e5bd85 Author: Don Bosco Durai <[email protected]> Authored: Fri May 29 14:54:22 2015 -0700 Committer: Don Bosco Durai <[email protected]> Committed: Fri May 29 14:54:22 2015 -0700 ---------------------------------------------------------------------- .../org/apache/ranger/audit/dao/DaoManager.java | 2 + .../audit/destination/DBAuditDestination.java | 306 +++++++++++++++++++ .../audit/destination/HDFSAuditDestination.java | 3 + .../audit/provider/AuditProviderFactory.java | 3 +- .../ranger/audit/provider/BaseAuditHandler.java | 5 +- .../apache/ranger/audit/provider/MiscUtil.java | 15 + .../ranger/audit/queue/AuditAsyncQueue.java | 25 +- .../ranger/audit/queue/AuditBatchQueue.java | 24 +- .../apache/ranger/audit/queue/AuditQueue.java | 6 + .../ranger/audit/queue/AuditSummaryQueue.java | 25 +- 10 files changed, 409 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java b/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java index 6d81744..fd4d096 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/dao/DaoManager.java @@ -49,6 +49,8 @@ public class DaoManager extends DaoManagerBase { sEntityManager.set(em); } + } else { + logger.error("EntityManagerFactory was not set in this thread.", new Throwable()); } return em; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java new file mode 100644 index 0000000..c58748e --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/DBAuditDestination.java @@ -0,0 +1,306 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.EntityTransaction; +import javax.persistence.Persistence; + +import org.apache.ranger.audit.dao.DaoManager; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.MiscUtil; + +public class DBAuditDestination extends AuditDestination { + + private static final Log logger = LogFactory + .getLog(DBAuditDestination.class); + + public static final String PROP_DB_JDBC_DRIVER = "jdbc.driver"; + public static final String PROP_DB_JDBC_URL = "jdbc.url"; + public static final String PROP_DB_USER = "user"; + public static final String PROP_DB_PASSWORD = "password"; + public static final String PROP_DB_PASSWORD_ALIAS = "password.alias"; + + private EntityManagerFactory entityManagerFactory; + private DaoManager daoManager; + + private String jdbcDriver = null; + private String jdbcURL = null; + private String dbUser = null; + private String dbPasswordAlias = "auditDBCred"; + + public DBAuditDestination() { + logger.info("DBAuditDestination() called"); + } + + @Override + public void init(Properties props, String propPrefix) { + logger.info("init() called"); + super.init(props, propPrefix); + + // Initial connect + connect(); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditHandler#logger(java.util.Collection + * ) + */ + @Override + public boolean log(Collection<AuditEventBase> events) { + boolean retValue = false; + + if (!beginTransaction()) { + return false; + } + boolean isFailed = false; + for (AuditEventBase event : events) { + try { + event.persist(daoManager); + } catch (Throwable t) { + logger.error("Error persisting data. event=" + event, t); + isFailed = true; + break; + } + } + if (isFailed) { + retValue = false; + rollbackTransaction(); + } else { + retValue = commitTransaction(); + } + return retValue; + } + + @Override + public void stop() { + cleanUp(); + super.stop(); + } + + // Local methods + protected void connect() { + if (isDbConnected()) { + return; + } + try { + jdbcDriver = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_DB_JDBC_DRIVER); + jdbcURL = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_DB_JDBC_URL); + dbUser = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_DB_USER); + String dbPassword = MiscUtil.getStringProperty(props, propPrefix + + "." + PROP_DB_PASSWORD); + String tmpAlias = MiscUtil.getStringProperty(props, propPrefix + + "." + PROP_DB_PASSWORD_ALIAS); + dbPasswordAlias = tmpAlias != null ? tmpAlias : dbPasswordAlias; + String credFile = MiscUtil.getStringProperty(props, + AUDIT_DB_CREDENTIAL_PROVIDER_FILE); + + if (jdbcDriver == null || jdbcDriver.isEmpty()) { + logger.fatal("JDBC driver not provided. Set property name " + + propPrefix + "." + PROP_DB_JDBC_DRIVER); + return; + } + if (jdbcURL == null || jdbcURL.isEmpty()) { + logger.fatal("JDBC URL not provided. Set property name " + + propPrefix + "." + PROP_DB_JDBC_URL); + return; + } + if (dbUser == null || dbUser.isEmpty()) { + logger.fatal("DB user not provided. Set property name " + + propPrefix + "." + PROP_DB_USER); + return; + } + if (dbPassword == null || dbPassword.isEmpty()) { + logger.warn("DB password not provided. Will assume empty for now. Set property name " + + propPrefix + "." + PROP_DB_PASSWORD); + } else { + dbPassword = MiscUtil.getCredentialString(credFile, + dbPasswordAlias); + } + logger.info("JDBC Driver=" + jdbcDriver + ", JDBC URL=" + jdbcURL + + ", dbUser=" + dbUser + ", passwordAlias=" + + dbPasswordAlias + ", credFile=" + credFile); + + Map<String, String> dbProperties = new HashMap<String, String>(); + dbProperties.put("javax.persistence.jdbc.driver", jdbcDriver); + dbProperties.put("javax.persistence.jdbc.url", jdbcURL); + dbProperties.put("javax.persistence.jdbc.user", dbUser); + if (dbPassword != null) { + dbProperties.put("javax.persistence.jdbc.password", dbPassword); + } + + entityManagerFactory = Persistence.createEntityManagerFactory( + "xa_server", dbProperties); + + logger.info("entityManagerFactory=" + entityManagerFactory); + + daoManager = new DaoManager(); + daoManager.setEntityManagerFactory(entityManagerFactory); + + // this forces the connection to be made to DB + if (daoManager.getEntityManager() != null) { + logger.error("Error connecting audit database. EntityManager is null. dbURL=" + + jdbcURL + ", dbUser=" + dbUser); + } + + } catch (Throwable t) { + logger.error("Error connecting audit database. dbURL=" + jdbcURL + + ", dbUser=" + dbUser, t); + } + } + + private synchronized void cleanUp() { + logger.info("DBAuditDestination: cleanUp()"); + + try { + if (entityManagerFactory != null && entityManagerFactory.isOpen()) { + entityManagerFactory.close(); + } + } catch (Exception excp) { + logger.error("DBAuditDestination.cleanUp(): failed", excp); + } finally { + entityManagerFactory = null; + daoManager = null; + } + } + + private EntityManager getEntityManager() { + DaoManager daoMgr = daoManager; + + if (daoMgr != null) { + try { + return daoMgr.getEntityManager(); + } catch (Exception excp) { + logger.error("DBAuditDestination.getEntityManager(): failed", + excp); + + cleanUp(); + } + } + + return null; + } + + private boolean isDbConnected() { + EntityManager em = getEntityManager(); + return em != null && em.isOpen(); + } + + private void clearEntityManager() { + try { + EntityManager em = getEntityManager(); + + if (em != null) { + em.clear(); + } + } catch (Exception excp) { + logger.warn("DBAuditDestination.clearEntityManager(): failed", excp); + } + } + + private EntityTransaction getTransaction() { + if (!isDbConnected()) { + connect(); + } + + EntityManager em = getEntityManager(); + + return em != null ? em.getTransaction() : null; + } + + private boolean beginTransaction() { + EntityTransaction trx = getTransaction(); + + if (trx != null && !trx.isActive()) { + trx.begin(); + } + + if (trx == null) { + logger.warn("DBAuditDestination.beginTransaction(): trx is null"); + } + + return trx != null; + } + + private boolean commitTransaction() { + boolean ret = false; + EntityTransaction trx = null; + + try { + trx = getTransaction(); + + if (trx != null && trx.isActive()) { + trx.commit(); + ret = true; + } else { + throw new Exception("trx is null or not active"); + } + } catch (Throwable excp) { + logger.error("DBAuditDestination.commitTransaction(): failed", excp); + + cleanUp(); // so that next insert will try to init() + } finally { + clearEntityManager(); + } + + return ret; + } + + private boolean rollbackTransaction() { + boolean ret = false; + EntityTransaction trx = null; + + try { + trx = getTransaction(); + + if (trx != null && trx.isActive()) { + trx.rollback(); + ret = true; + } else { + throw new Exception("trx is null or not active"); + } + } catch (Throwable excp) { + logger.error("DBAuditDestination.rollbackTransaction(): failed", + excp); + + cleanUp(); // so that next insert will try to init() + } finally { + clearEntityManager(); + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java index 6ca4fce..67382a9 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java @@ -105,6 +105,9 @@ public class HDFSAuditDestination extends AuditDestination { @Override synchronized public boolean logJSON(Collection<String> events) { + if (!initDone) { + return false; + } if (isStopped) { logError("log() called after stop was requested. name=" + getName()); return false; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index d6ef318..c3a05ce 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -24,6 +24,7 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.destination.DBAuditDestination; import org.apache.ranger.audit.destination.FileAuditDestination; import org.apache.ranger.audit.destination.HDFSAuditDestination; import org.apache.ranger.audit.destination.SolrAuditDestination; @@ -415,7 +416,7 @@ public class AuditProviderFactory { } else if (providerName.equals("kafka")) { provider = new KafkaAuditProvider(); } else if (providerName.equals("db")) { - provider = new DbAuditProvider(); + provider = new DBAuditDestination(); } else if (providerName.equals("log4j")) { provider = new Log4jAuditProvider(); } else if (providerName.equals("batch")) { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java index dd44def..09335c7 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; + import com.google.gson.GsonBuilder; import java.util.concurrent.atomic.AtomicLong; @@ -33,7 +34,9 @@ import java.util.Properties; public abstract class BaseAuditHandler implements AuditHandler { private static final Log LOG = LogFactory.getLog(BaseAuditHandler.class); - private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms"; + static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms"; + protected static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE = "xasecure.audit.credential.provider.file"; + private int mLogFailureReportMinIntervalInMs = 60 * 1000; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java index fe6b0e9..abb0a90 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java @@ -29,6 +29,7 @@ import java.util.StringTokenizer; import java.util.UUID; import org.apache.log4j.helpers.LogLog; +import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -377,5 +378,19 @@ public class MiscUtil { } return list; } + + public static String getCredentialString(String url,String alias) { + String ret = null; + + if(url != null && alias != null) { + char[] cred = RangerCredentialProvider.getInstance().getCredentialString(url,alias); + + if ( cred != null ) { + ret = new String(cred); + } + } + + return ret; + } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java index d16fff9..de5941a 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java @@ -102,9 +102,16 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable { */ @Override public void stop() { + logger.info("Stop called. name=" + getName()); + if (stopTime != 0) { + stopTime = System.currentTimeMillis(); + } setDrain(true); try { if (consumerThread != null) { + logger.info("Interrupting consumerThread. name=" + getName() + + ", consumer=" + + (consumer == null ? null : consumer.getName())); consumerThread.interrupt(); } } catch (Throwable t) { @@ -138,7 +145,7 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable { } } catch (InterruptedException e) { logger.info( - "Caught exception in consumer thread. Mostly to about loop", + "Caught exception in consumer thread. Mostly server is shutting down.", e); } catch (Throwable t) { logger.error("Caught error during processing request.", t); @@ -146,13 +153,29 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable { if (isDrain() && queue.isEmpty()) { break; } + if (isDrain() + && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) { + logger.warn("Exiting polling loop to max time allowed. name=" + + getName() + ", waited for " + + (stopTime - System.currentTimeMillis()) + " ms"); + + break; + } } + logger.info("Exiting polling loop. name=" + getName()); + try { // Call stop on the consumer + logger.info("Calling to stop consumer. name=" + getName() + + ", consumer.name=" + consumer.getName()); + + // Call stop on the consumer consumer.stop(); } catch (Throwable t) { logger.error("Error while calling stop on consumer.", t); } + logger.info("Exiting consumerThread.run() method. name=" + getName()); + } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java index 8316c2b..645483b 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java @@ -119,10 +119,19 @@ public class AuditBatchQueue extends AuditQueue implements Runnable { */ @Override public void stop() { + logger.info("Stop called. name=" + getName()); + if (stopTime != 0) { + stopTime = System.currentTimeMillis(); + } + setDrain(true); flush(); try { if (consumerThread != null) { + logger.info("Interrupting consumerThread. name=" + getName() + + ", consumer=" + + (consumer == null ? null : consumer.getName())); + consumerThread.interrupt(); } } catch (Throwable t) { @@ -257,7 +266,7 @@ public class AuditBatchQueue extends AuditQueue implements Runnable { } } catch (InterruptedException e) { logger.info( - "Caught exception in consumer thread. Mostly to abort loop", + "Caught exception in consumer thread. Mostly server is shutting down.", e); setDrain(true); } catch (Throwable t) { @@ -311,12 +320,24 @@ public class AuditBatchQueue extends AuditQueue implements Runnable { break; } } + if (isDrain() + && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) { + logger.warn("Exiting polling loop to max time allowed. name=" + + getName() + ", waited for " + + (stopTime - System.currentTimeMillis()) + " ms"); + + break; + } + } logger.info("Exiting consumerThread. Queue=" + getName() + ", dest=" + consumer.getName()); try { // Call stop on the consumer + logger.info("Calling to stop consumer. name=" + getName() + + ", consumer.name=" + consumer.getName()); + consumer.stop(); if (fileSpoolerEnabled) { fileSpooler.stop(); @@ -324,5 +345,6 @@ public class AuditBatchQueue extends AuditQueue implements Runnable { } catch (Throwable t) { logger.error("Error while calling stop on consumer.", t); } + logger.info("Exiting consumerThread.run() method. name=" + getName()); } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java index 4c3ac5f..039dc6d 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java @@ -33,6 +33,9 @@ public abstract class AuditQueue extends BaseAuditHandler { public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024; public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000; public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000; + + //This is the max time the consumer thread will wait before exiting the loop + public static final int AUDIT_CONSUMER_THREAD_WAIT_MS = 5000; private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT; private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS; @@ -57,6 +60,9 @@ public abstract class AuditQueue extends BaseAuditHandler { protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes protected int fileSpoolDrainThresholdPercent = 80; + //This is set when the first time stop is called. + protected long stopTime = 0; + /** * @param consumer */ http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/a2de2450/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java index 7922312..1e5b500 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java @@ -122,9 +122,18 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable { */ @Override public void stop() { + logger.info("Stop called. name=" + getName()); + if (stopTime != 0) { + stopTime = System.currentTimeMillis(); + } + setDrain(true); try { if (consumerThread != null) { + logger.info("Interrupting consumerThread. name=" + getName() + + ", consumer=" + + (consumer == null ? null : consumer.getName())); + consumerThread.interrupt(); } } catch (Throwable t) { @@ -170,7 +179,7 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable { } } catch (InterruptedException e) { logger.info( - "Caught exception in consumer thread. Mostly to about loop", + "Caught exception in consumer thread. Mostly server is shutting down.", e); } catch (Throwable t) { logger.error("Caught error during processing request.", t); @@ -217,14 +226,28 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable { if (isDrain() && summaryMap.isEmpty() && queue.isEmpty()) { break; } + if (isDrain() + && (stopTime - System.currentTimeMillis()) > AUDIT_CONSUMER_THREAD_WAIT_MS) { + logger.warn("Exiting polling loop to max time allowed. name=" + + getName() + ", waited for " + + (stopTime - System.currentTimeMillis()) + " ms"); + + break; + } + } + logger.info("Exiting polling loop. name=" + getName()); try { // Call stop on the consumer + logger.info("Calling to stop consumer. name=" + getName() + + ", consumer.name=" + consumer.getName()); consumer.stop(); } catch (Throwable t) { logger.error("Error while calling stop on consumer.", t); } + logger.info("Exiting consumerThread.run() method. name=" + getName()); + } class AuditSummary {
