Repository: incubator-ranger Updated Branches: refs/heads/master fa59f97bc -> 578b4ed2f
RANGER-1136: Ranger audit to HDFS fails with TGT errors in Ranger HiveServer2 plugin when UGI -TGT expires in audit thread Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/578b4ed2 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/578b4ed2 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/578b4ed2 Branch: refs/heads/master Commit: 578b4ed2f0ce6cf53c7fe571c59a0d2e40525287 Parents: fa59f97 Author: rmani <[email protected]> Authored: Fri Aug 5 18:43:24 2016 -0700 Committer: rmani <[email protected]> Committed: Fri Aug 5 18:43:24 2016 -0700 ---------------------------------------------------------------------- .../audit/destination/HDFSAuditDestination.java | 30 +++++++-- .../audit/destination/SolrAuditDestination.java | 58 ++++++++++++++++-- .../provider/kafka/KafkaAuditProvider.java | 64 ++++++++++++++++---- .../audit/provider/solr/SolrAuditProvider.java | 36 +++++++++-- .../ranger/audit/queue/AuditAsyncQueue.java | 19 +----- .../ranger/audit/queue/AuditBatchQueue.java | 18 +----- .../ranger/audit/queue/AuditFileSpool.java | 17 +----- .../ranger/audit/queue/AuditSummaryQueue.java | 17 +----- 8 files changed, 169 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/578b4ed2/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java index 9da97ed..07023ba 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java @@ -23,6 +23,7 @@ import java.io.FileNotFoundException; import java.io.IOException; import java.io.PrintWriter; import java.net.URI; +import java.security.PrivilegedExceptionAction; import java.util.*; import org.apache.commons.lang.StringUtils; @@ -32,6 +33,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.provider.MiscUtil; import org.apache.ranger.audit.utils.RollingTimeUtil; @@ -131,7 +133,7 @@ public class HDFSAuditDestination extends AuditDestination { } @Override - synchronized public boolean logJSON(Collection<String> events) { + synchronized public boolean logJSON(final Collection<String> events) { logStatusIfRequired(); addTotalCount(events.size()); @@ -150,10 +152,26 @@ public class HDFSAuditDestination extends AuditDestination { logger.debug("UGI=" + MiscUtil.getUGILoginUser() + ". Will write to HDFS file=" + currentFileName); } - PrintWriter out = getLogFileStream(); - for (String event : events) { - out.println(event); + + PrivilegedExceptionAction<PrintWriter> action = new PrivilegedExceptionAction<PrintWriter>() { + @Override + public PrintWriter run() throws Exception { + PrintWriter out = getLogFileStream(); + for (String event : events) { + out.println(event); + } + return out; + }; + }; + + PrintWriter out = null; + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if ( ugi != null) { + out = ugi.doAs(action); + } else { + out = action.run(); } + // flush and check the stream for errors if (out.checkError()) { // In theory, this count may NOT be accurate as part of the messages may have been successfully written. @@ -230,7 +248,7 @@ public class HDFSAuditDestination extends AuditDestination { } // Helper methods in this class - synchronized private PrintWriter getLogFileStream() throws Throwable { + synchronized private PrintWriter getLogFileStream() throws Exception { closeFileIfNeeded(); // Either there are no open log file or the previous one has been rolled @@ -294,7 +312,7 @@ public class HDFSAuditDestination extends AuditDestination { } private void createParents(Path pathLogfile, FileSystem fileSystem) - throws Throwable { + throws Exception { logger.info("Creating parent folder for " + pathLogfile); Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/578b4ed2/agents-audit/src/main/java/org/apache/ranger/audit/destination/SolrAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/SolrAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/SolrAuditDestination.java index 738c091..5502b10 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/SolrAuditDestination.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/SolrAuditDestination.java @@ -21,6 +21,7 @@ package org.apache.ranger.audit.destination; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; import org.apache.ranger.audit.provider.MiscUtil; @@ -35,6 +36,7 @@ import org.apache.solr.common.SolrException; import org.apache.solr.common.SolrInputDocument; import java.lang.reflect.Field; +import java.security.PrivilegedExceptionAction; import java.util.ArrayList; import java.util.Collection; import java.util.HashSet; @@ -111,8 +113,23 @@ public class SolrAuditDestination extends AuditDestination { try { // Instantiate HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); - CloudSolrClient solrCloudClient = new CloudSolrClient( - zkHosts); + final String zkhosts =zkHosts; + PrivilegedExceptionAction<CloudSolrClient> action = new PrivilegedExceptionAction<CloudSolrClient>() { + @Override + public CloudSolrClient run() throws Exception { + CloudSolrClient solrCloudClient = new CloudSolrClient( + zkhosts); + return solrCloudClient; + }; + }; + + CloudSolrClient solrCloudClient = null; + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if (ugi != null) { + solrCloudClient = ugi.doAs(action); + } else { + solrCloudClient = action.run(); + } solrCloudClient.setDefaultCollection(collectionName); me = solrClient = solrCloudClient; } catch (Throwable t) { @@ -126,8 +143,23 @@ public class SolrAuditDestination extends AuditDestination { try { LOG.info("Connecting to Solr using URLs=" + solrURLs); HttpClientUtil.setConfigurer(new Krb5HttpClientConfigurer()); - LBHttpSolrClient lbSolrClient = new LBHttpSolrClient( - solrURLs.get(0)); + final List<String> solrUrls = solrURLs; + PrivilegedExceptionAction<LBHttpSolrClient> action = new PrivilegedExceptionAction<LBHttpSolrClient>() { + @Override + public LBHttpSolrClient run() throws Exception { + LBHttpSolrClient lbSolrClient = new LBHttpSolrClient( + solrUrls.get(0)); + return lbSolrClient; + }; + }; + + LBHttpSolrClient lbSolrClient = null; + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if (ugi != null) { + lbSolrClient = ugi.doAs(action); + } else { + lbSolrClient = action.run(); + } lbSolrClient.setConnectionTimeout(1000); for (int i = 1; i < solrURLs.size(); i++) { @@ -195,7 +227,7 @@ public class SolrAuditDestination extends AuditDestination { } } - Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(); + final Collection<SolrInputDocument> docs = new ArrayList<SolrInputDocument>(); for (AuditEventBase event : events) { AuthzAuditEvent authzEvent = (AuthzAuditEvent) event; // Convert AuditEventBase to Solr document @@ -203,7 +235,21 @@ public class SolrAuditDestination extends AuditDestination { docs.add(document); } try { - UpdateResponse response = solrClient.add(docs); + PrivilegedExceptionAction<UpdateResponse> action = new PrivilegedExceptionAction<UpdateResponse>() { + @Override + public UpdateResponse run() throws Exception { + UpdateResponse response = solrClient.add(docs); + return response; + }; + }; + + UpdateResponse response = null; + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if (ugi != null) { + response = ugi.doAs(action); + } else { + response = action.run(); + } if (response.getStatus() != 0) { addFailedCount(events.size()); logFailedEvent(events, response.toString()); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/578b4ed2/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java index 2c77b40..915c965 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java @@ -16,13 +16,17 @@ */ package org.apache.ranger.audit.provider.kafka; +import java.security.PrivilegedAction; +import java.security.PrivilegedExceptionAction; import java.util.Collection; +import java.util.HashMap; +import java.util.Map; import java.util.Properties; -import kafka.javaapi.producer.Producer; -import kafka.producer.KeyedMessage; -import kafka.producer.ProducerConfig; - +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.Producer; +import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.destination.AuditDestination; @@ -30,6 +34,7 @@ import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; import org.apache.ranger.audit.provider.MiscUtil; + public class KafkaAuditProvider extends AuditDestination { private static final Log LOG = LogFactory.getLog(KafkaAuditProvider.class); @@ -61,8 +66,7 @@ public class KafkaAuditProvider extends AuditDestination { brokerList = "localhost:9092"; } - Properties kakfaProps = new Properties(); - + final Map<String, Object> kakfaProps = new HashMap<String,Object>(); kakfaProps.put("metadata.broker.list", brokerList); kakfaProps.put("serializer.class", "kafka.serializer.StringEncoder"); @@ -73,8 +77,20 @@ public class KafkaAuditProvider extends AuditDestination { LOG.info("Connecting to Kafka producer using properties:" + kakfaProps.toString()); - ProducerConfig kafkaConfig = new ProducerConfig(kakfaProps); - producer = new Producer<String, String>(kafkaConfig); + PrivilegedAction<Producer<String, String>> action = new PrivilegedAction<Producer<String, String>>() { + @Override + public Producer<String, String> run(){ + Producer<String, String> producer = new KafkaProducer<String, String>(kakfaProps); + return producer; + }; + }; + + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if ( ugi != null) { + producer = ugi.doAs(action); + } else { + producer = action.run(); + } initDone = true; } } catch (Throwable t) { @@ -105,9 +121,22 @@ public class KafkaAuditProvider extends AuditDestination { if (producer != null) { // TODO: Add partition key - KeyedMessage<String, String> keyedMessage = new KeyedMessage<String, String>( + final ProducerRecord<String, String> keyedMessage = new ProducerRecord<String, String>( topic, message); - producer.send(keyedMessage); + PrivilegedAction<Void> action = new PrivilegedAction<Void>() { + @Override + public Void run(){ + producer.send(keyedMessage); + return null; + }; + }; + + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if ( ugi != null) { + ugi.doAs(action); + } else { + action.run(); + } } else { LOG.info("AUDIT LOG (Kafka Down):" + message); } @@ -154,7 +183,20 @@ public class KafkaAuditProvider extends AuditDestination { LOG.info("stop() called"); if (producer != null) { try { - producer.close(); + PrivilegedExceptionAction<Void> action = new PrivilegedExceptionAction<Void>() { + @Override + public Void run() throws Exception{ + producer.close(); + return null; + }; + }; + MiscUtil.getUGILoginUser().doAs(action); + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if ( ugi != null) { + ugi.doAs(action); + } else { + action.run(); + } } catch (Throwable t) { LOG.error("Error closing Kafka producer"); } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/578b4ed2/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java index 8b42be0..376865e 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java @@ -19,12 +19,14 @@ package org.apache.ranger.audit.provider.solr; +import java.security.PrivilegedExceptionAction; import java.util.Collection; import java.util.Date; import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.ranger.audit.destination.AuditDestination; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; @@ -66,7 +68,7 @@ public class SolrAuditProvider extends AuditDestination { synchronized (lock) { me = solrClient; if (me == null) { - String solrURL = MiscUtil.getStringProperty(props, + final String solrURL = MiscUtil.getStringProperty(props, "xasecure.audit.solr.solr_url"); if (lastConnectTime != null) { @@ -91,7 +93,20 @@ public class SolrAuditProvider extends AuditDestination { try { // TODO: Need to support SolrCloud also - me = solrClient = new HttpSolrClient(solrURL); + PrivilegedExceptionAction<SolrClient> action = new PrivilegedExceptionAction<SolrClient>() { + @Override + public SolrClient run() throws Exception { + SolrClient solrClient = new HttpSolrClient(solrURL); + return solrClient; + }; + }; + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if (ugi != null) { + solrClient = ugi.doAs(action); + } else { + solrClient = action.run(); + } + me = solrClient; if (solrClient instanceof HttpSolrClient) { HttpSolrClient httpSolrClient = (HttpSolrClient) solrClient; httpSolrClient.setAllowCompression(true); @@ -157,8 +172,21 @@ public class SolrAuditProvider extends AuditDestination { } } // Convert AuditEventBase to Solr document - SolrInputDocument document = toSolrDoc(authzEvent); - UpdateResponse response = solrClient.add(document); + final SolrInputDocument document = toSolrDoc(authzEvent); + UpdateResponse response = null; + PrivilegedExceptionAction<UpdateResponse> action = new PrivilegedExceptionAction<UpdateResponse>() { + @Override + public UpdateResponse run() throws Exception { + UpdateResponse response = solrClient.add(document); + return response; + }; + }; + UserGroupInformation ugi = MiscUtil.getUGILoginUser(); + if (ugi != null) { + response = ugi.doAs(action); + } else { + response = action.run(); + } if (response.getStatus() != 0) { lastFailTime = System.currentTimeMillis(); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/578b4ed2/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java index 26c690e..34712bf 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java @@ -19,7 +19,6 @@ package org.apache.ranger.audit.queue; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.concurrent.LinkedBlockingQueue; @@ -29,7 +28,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.log4j.MDC; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.provider.AuditHandler; -import org.apache.ranger.audit.provider.MiscUtil; /** * This is a non-blocking queue with no limit on capacity. @@ -130,26 +128,13 @@ public class AuditAsyncQueue extends AuditQueue implements Runnable { try { //This is done to clear the MDC context to avoid issue with Ranger Auditing for Knox MDC.clear(); - if (isConsumerDestination && MiscUtil.getUGILoginUser() != null) { - PrivilegedAction<Void> action = new PrivilegedAction<Void>() { - public Void run() { - runDoAs(); - return null; - }; - }; - logger.info("Running queue " + getName() + " as user " - + MiscUtil.getUGILoginUser()); - MiscUtil.getUGILoginUser().doAs(action); - } else { - runDoAs(); - } - + runLogAudit(); } catch (Throwable t) { logger.fatal("Exited thread abnormaly. queue=" + getName(), t); } } - public void runDoAs() { + public void runLogAudit() { while (true) { try { AuditEventBase event = null; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/578b4ed2/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java index 07c8819..95938f8 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java @@ -19,7 +19,6 @@ package org.apache.ranger.audit.queue; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.Properties; @@ -32,7 +31,6 @@ import org.apache.commons.logging.LogFactory; import org.apache.log4j.MDC; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.provider.AuditHandler; -import org.apache.ranger.audit.provider.MiscUtil; public class AuditBatchQueue extends AuditQueue implements Runnable { private static final Log logger = LogFactory.getLog(AuditBatchQueue.class); @@ -214,25 +212,13 @@ public class AuditBatchQueue extends AuditQueue implements Runnable { try { //This is done to clear the MDC context to avoid issue with Ranger Auditing for Knox MDC.clear(); - if (isConsumerDestination && MiscUtil.getUGILoginUser() != null) { - PrivilegedAction<Void> action = new PrivilegedAction<Void>() { - public Void run() { - runDoAs(); - return null; - }; - }; - logger.info("Running queue " + getName() + " as user " - + MiscUtil.getUGILoginUser()); - MiscUtil.getUGILoginUser().doAs(action); - } else { - runDoAs(); - } + runLogAudit(); } catch (Throwable t) { logger.fatal("Exited thread abnormaly. queue=" + getName(), t); } } - public void runDoAs() { + public void runLogAudit() { long lastDispatchTime = System.currentTimeMillis(); boolean isDestActive = true; while (true) { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/578b4ed2/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java index fc4ff5d..c0a05ec 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java @@ -28,7 +28,6 @@ import java.io.FileReader; import java.io.FileWriter; import java.io.IOException; import java.io.PrintWriter; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -756,26 +755,14 @@ public class AuditFileSpool implements Runnable { try { //This is done to clear the MDC context to avoid issue with Ranger Auditing for Knox MDC.clear(); - if (MiscUtil.getUGILoginUser() != null) { - PrivilegedAction<Void> action = new PrivilegedAction<Void>() { - public Void run() { - runDoAs(); - return null; - }; - }; - logger.info("Running fileSpool " + consumerProvider.getName() - + " as user " + MiscUtil.getUGILoginUser()); - MiscUtil.getUGILoginUser().doAs(action); - } else { - runDoAs(); - } + runLogAudit(); } catch (Throwable t) { logger.fatal("Exited thread without abnormaly. queue=" + consumerProvider.getName(), t); } } - public void runDoAs() { + public void runLogAudit() { // boolean isResumed = false; while (true) { try { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/578b4ed2/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java index 14d0ab6..b4505f1 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java @@ -19,7 +19,6 @@ package org.apache.ranger.audit.queue; -import java.security.PrivilegedAction; import java.util.ArrayList; import java.util.Collection; import java.util.Date; @@ -150,26 +149,14 @@ public class AuditSummaryQueue extends AuditQueue implements Runnable { try { //This is done to clear the MDC context to avoid issue with Ranger Auditing for Knox MDC.clear(); - if (isConsumerDestination && MiscUtil.getUGILoginUser() != null) { - PrivilegedAction<Void> action = new PrivilegedAction<Void>() { - public Void run() { - runDoAs(); - return null; - }; - }; - logger.info("Running queue " + getName() + " as user " - + MiscUtil.getUGILoginUser()); - MiscUtil.getUGILoginUser().doAs(action); - } else { - runDoAs(); - } + runLogAudit(); } catch (Throwable t) { logger.fatal("Exited thread without abnormaly. queue=" + getName(), t); } } - public void runDoAs() { + public void runLogAudit() { long lastDispatchTime = System.currentTimeMillis();
