RANGER-397 - Implement reliable streaming audits to configurable destinations - Incorporate Review Feedback
Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/4f3cea22 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/4f3cea22 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/4f3cea22 Branch: refs/heads/master Commit: 4f3cea223b9bb717577732bf050bc78f16e94a69 Parents: 42a0e25 Author: Don Bosco Durai <[email protected]> Authored: Wed Apr 22 09:49:01 2015 -0700 Committer: Don Bosco Durai <[email protected]> Committed: Wed Apr 22 09:49:01 2015 -0700 ---------------------------------------------------------------------- .../audit/destination/AuditDestination.java | 32 +- .../audit/destination/FileAuditDestination.java | 33 +- .../audit/destination/HDFSAuditDestination.java | 50 ++- .../ranger/audit/model/AuditEventBase.java | 4 +- .../audit/provider/AsyncAuditProvider.java | 60 +-- .../ranger/audit/provider/AuditHandler.java | 46 ++ .../ranger/audit/provider/AuditProvider.java | 56 --- .../audit/provider/AuditProviderFactory.java | 142 +++--- .../ranger/audit/provider/BaseAuditHandler.java | 271 ++++++++++++ .../audit/provider/BaseAuditProvider.java | 432 ------------------- .../audit/provider/BufferedAuditProvider.java | 12 +- .../ranger/audit/provider/DbAuditProvider.java | 10 - .../audit/provider/DummyAuditProvider.java | 35 +- .../audit/provider/Log4jAuditProvider.java | 2 - .../audit/provider/MultiDestAuditProvider.java | 59 +-- .../provider/kafka/KafkaAuditProvider.java | 22 +- .../audit/provider/solr/SolrAuditProvider.java | 33 +- .../ranger/audit/queue/AuditAsyncQueue.java | 34 +- .../ranger/audit/queue/AuditBatchQueue.java | 26 +- .../ranger/audit/queue/AuditFileSpool.java | 57 ++- .../apache/ranger/audit/queue/AuditQueue.java | 174 ++++++++ .../ranger/audit/queue/AuditSummaryQueue.java | 49 +-- .../apache/ranger/audit/test/TestEvents.java | 4 +- .../org/apache/ranger/audit/TestAuditQueue.java | 98 +++-- .../org/apache/ranger/audit/TestConsumer.java | 46 +- 25 files changed, 815 insertions(+), 972 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java index 25c0220..9db8937 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java @@ -23,13 +23,13 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.ranger.audit.provider.BaseAuditProvider; +import org.apache.ranger.audit.provider.BaseAuditHandler; /** * This class needs to be extended by anyone who wants to build custom * destination */ -public abstract class AuditDestination extends BaseAuditProvider { +public abstract class AuditDestination extends BaseAuditHandler { private static final Log logger = LogFactory.getLog(AuditDestination.class); public AuditDestination() { @@ -51,21 +51,31 @@ public abstract class AuditDestination extends BaseAuditProvider { /* * (non-Javadoc) * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() + * @see org.apache.ranger.audit.provider.AuditProvider#flush() */ @Override - public boolean isFlushPending() { - return false; + public void flush() { + } - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#flush() - */ @Override - public void flush() { + public void start() { + + } + + @Override + public void stop() { + + } + @Override + public void waitToComplete() { + } + @Override + public void waitToComplete(long timeout) { + + } + } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java index 1ccfd5f..a132cdf 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java @@ -21,9 +21,7 @@ package org.apache.ranger.audit.destination; import java.io.BufferedWriter; import java.io.File; -import java.io.FileNotFoundException; import java.io.FileWriter; -import java.io.IOException; import java.io.PrintWriter; import java.util.ArrayList; import java.util.Collection; @@ -107,7 +105,12 @@ public class FileAuditDestination extends AuditDestination { } @Override - public boolean logJSON(Collection<String> events) { + synchronized public boolean logJSON(Collection<String> events) { + if (isStopped) { + logError("log() called after stop was requested. name=" + getName()); + return false; + } + try { PrintWriter out = getLogFileStream(); for (String event : events) { @@ -128,7 +131,7 @@ public class FileAuditDestination extends AuditDestination { * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection) */ @Override - synchronized public boolean log(Collection<AuditEventBase> events) { + public boolean log(Collection<AuditEventBase> events) { if (isStopped) { logError("log() called after stop was requested. name=" + getName()); return false; @@ -158,11 +161,16 @@ public class FileAuditDestination extends AuditDestination { @Override synchronized public void stop() { + isStopped = true; if (logWriter != null) { - logWriter.flush(); - logWriter.close(); + try { + logWriter.flush(); + logWriter.close(); + } catch (Throwable t) { + logger.error("Error on closing log writter. Exception will be ignored. name=" + + getName() + ", fileName=" + currentFileName); + } logWriter = null; - isStopped = true; } } @@ -214,15 +222,20 @@ public class FileAuditDestination extends AuditDestination { return logWriter; } - private void closeFileIfNeeded() throws FileNotFoundException, IOException { + private void closeFileIfNeeded() { if (logWriter == null) { return; } if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) { logger.info("Closing file. Rolling over. name=" + getName() + ", fileName=" + currentFileName); - logWriter.flush(); - logWriter.close(); + try { + logWriter.flush(); + logWriter.close(); + } catch (Throwable t) { + logger.error("Error on closing log writter. Exception will be ignored. name=" + + getName() + ", fileName=" + currentFileName); + } logWriter = null; currentFileName = null; } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java index 706eb8e..6ca4fce 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java @@ -74,6 +74,12 @@ public class HDFSAuditDestination extends AuditDestination { // Initial folder and file properties String logFolderProp = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_HDFS_DIR); + if (logFolderProp == null || logFolderProp.isEmpty()) { + logger.fatal("File destination folder is not configured. Please set " + + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName()); + return; + } + String logSubFolder = MiscUtil.getStringProperty(props, propPrefix + "." + PROP_HDFS_SUBDIR); if (logSubFolder == null || logSubFolder.isEmpty()) { @@ -89,12 +95,6 @@ public class HDFSAuditDestination extends AuditDestination { logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log"; } - if (logFolderProp == null || logFolderProp.isEmpty()) { - logger.fatal("File destination folder is not configured. Please set " - + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName()); - return; - } - logFolder = logFolderProp + "/" + logSubFolder; logger.info("logFolder=" + logFolder + ", destName=" + getName()); logger.info("logFileNameFormat=" + logFileNameFormat + ", destName=" @@ -104,7 +104,12 @@ public class HDFSAuditDestination extends AuditDestination { } @Override - public boolean logJSON(Collection<String> events) { + synchronized public boolean logJSON(Collection<String> events) { + if (isStopped) { + logError("log() called after stop was requested. name=" + getName()); + return false; + } + try { PrintWriter out = getLogFileStream(); for (String event : events) { @@ -125,7 +130,7 @@ public class HDFSAuditDestination extends AuditDestination { * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection) */ @Override - synchronized public boolean log(Collection<AuditEventBase> events) { + public boolean log(Collection<AuditEventBase> events) { if (isStopped) { logError("log() called after stop was requested. name=" + getName()); return false; @@ -155,15 +160,16 @@ public class HDFSAuditDestination extends AuditDestination { @Override synchronized public void stop() { - try { - if (logWriter != null) { + isStopped = true; + if (logWriter != null) { + try { logWriter.flush(); logWriter.close(); - logWriter = null; - isStopped = true; + } catch (Throwable t) { + logger.error("Error on closing log writter. Exception will be ignored. name=" + + getName() + ", fileName=" + currentFileName); } - } catch (Throwable t) { - logger.error("Error closing HDFS file.", t); + logWriter = null; } } @@ -198,9 +204,11 @@ public class HDFSAuditDestination extends AuditDestination { String extension = defaultPath.substring(lastDot); fullPath = baseName + "." + i + extension; hdfPath = new Path(fullPath); - logger.info("Checking whether log file exists. hdfPath=" + fullPath); + logger.info("Checking whether log file exists. hdfPath=" + + fullPath); } - logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath); + logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + + fullPath); // Create parent folders createParents(hdfPath, fileSystem); @@ -234,8 +242,14 @@ public class HDFSAuditDestination extends AuditDestination { if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) { logger.info("Closing file. Rolling over. name=" + getName() + ", fileName=" + currentFileName); - logWriter.flush(); - logWriter.close(); + try { + logWriter.flush(); + logWriter.close(); + } catch (Throwable t) { + logger.error("Error on closing log writter. Exception will be ignored. name=" + + getName() + ", fileName=" + currentFileName); + } + logWriter = null; currentFileName = null; } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java index 39a2578..2c6a87f 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java @@ -32,8 +32,8 @@ public abstract class AuditEventBase { public abstract String getEventKey(); public abstract Date getEventTime (); - public abstract void setEventCount(long frequencyCount); - public abstract void setEventDurationMS(long frequencyDurationMS); + public abstract void setEventCount(long eventCount); + public abstract void setEventDurationMS(long eventDurationMS); protected String trim(String str, int len) { String ret = str; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java index 53adc86..c3a0c78 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java @@ -68,7 +68,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements mQueue = new ArrayBlockingQueue<AuditEventBase>(mMaxQueueSize); } - public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditProvider provider) { + public AsyncAuditProvider(String name, int maxQueueSize, int maxFlushInterval, AuditHandler provider) { this(name, maxQueueSize, maxFlushInterval); addAuditProvider(provider); @@ -174,21 +174,21 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements while(ret == null) { logSummaryIfRequired(); - if (mMaxFlushInterval > 0 && isFlushPending()) { - long timeTillNextFlush = getTimeTillNextFlush(); - - if (timeTillNextFlush <= 0) { - break; // force flush - } - - ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS); - } else { +// if (mMaxFlushInterval > 0 && isFlushPending()) { +// long timeTillNextFlush = getTimeTillNextFlush(); +// +// if (timeTillNextFlush <= 0) { +// break; // force flush +// } +// +// ret = mQueue.poll(timeTillNextFlush, TimeUnit.MILLISECONDS); +// } else { // Let's wake up for summary logging long waitTime = intervalLogDurationMS - (System.currentTimeMillis() - lastIntervalLogTime); waitTime = waitTime <= 0 ? intervalLogDurationMS : waitTime; ret = mQueue.poll(waitTime, TimeUnit.MILLISECONDS); - } +// } } if(ret != null) { @@ -246,23 +246,23 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements LOG.debug("<== AsyncAuditProvider.waitToComplete()"); } - private long getTimeTillNextFlush() { - long timeTillNextFlush = mMaxFlushInterval; - - if (mMaxFlushInterval > 0) { - long lastFlushTime = getLastFlushTime(); - - if (lastFlushTime != 0) { - long timeSinceLastFlush = System.currentTimeMillis() - - lastFlushTime; - - if (timeSinceLastFlush >= mMaxFlushInterval) - timeTillNextFlush = 0; - else - timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush; - } - } - - return timeTillNextFlush; - } +// private long getTimeTillNextFlush() { +// long timeTillNextFlush = mMaxFlushInterval; +// +// if (mMaxFlushInterval > 0) { +// long lastFlushTime = getLastFlushTime(); +// +// if (lastFlushTime != 0) { +// long timeSinceLastFlush = System.currentTimeMillis() +// - lastFlushTime; +// +// if (timeSinceLastFlush >= mMaxFlushInterval) +// timeTillNextFlush = 0; +// else +// timeTillNextFlush = mMaxFlushInterval - timeSinceLastFlush; +// } +// } +// +// return timeTillNextFlush; +// } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java new file mode 100644 index 0000000..7b51f1d --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditHandler.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.audit.provider; + +import java.util.Collection; +import java.util.Properties; + +import org.apache.ranger.audit.model.AuditEventBase; + +public interface AuditHandler { + public boolean log(AuditEventBase event); + public boolean log(Collection<AuditEventBase> events); + + public boolean logJSON(String event); + public boolean logJSON(Collection<String> events); + + public void init(Properties prop); + public void init(Properties prop, String basePropertyName); + public void start(); + public void stop(); + public void waitToComplete(); + public void waitToComplete(long timeout); + + /** + * Name for this provider. Used only during logging. Uniqueness is not guaranteed + */ + public String getName(); + + public void flush(); +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java deleted file mode 100644 index 0e38624..0000000 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ranger.audit.provider; - -import java.util.Collection; -import java.util.Properties; - -import org.apache.ranger.audit.model.AuditEventBase; - -public interface AuditProvider { - public boolean log(AuditEventBase event); - public boolean log(Collection<AuditEventBase> events); - - public boolean logJSON(String event); - public boolean logJSON(Collection<String> events); - - public void init(Properties prop); - public void init(Properties prop, String basePropertyName); - public void start(); - public void stop(); - public void waitToComplete(); - public void waitToComplete(long timeout); - - /** - * Name for this provider. Used only during logging. Uniqueness is not guaranteed - */ - public String getName(); - - /** - * If this AuditProvider in the state of shutdown - * @return - */ - public boolean isDrain(); - - public int getMaxBatchSize(); - public int getMaxBatchInterval(); - public boolean isFlushPending(); - public long getLastFlushTime(); - public void flush(); -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index a67f7e0..7b2b52b 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -31,6 +31,7 @@ import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider; import org.apache.ranger.audit.provider.solr.SolrAuditProvider; import org.apache.ranger.audit.queue.AuditAsyncQueue; import org.apache.ranger.audit.queue.AuditBatchQueue; +import org.apache.ranger.audit.queue.AuditQueue; import org.apache.ranger.audit.queue.AuditSummaryQueue; /* @@ -58,7 +59,7 @@ public class AuditProviderFactory { private static AuditProviderFactory sFactory; - private AuditProvider mProvider = null; + private AuditHandler mProvider = null; private boolean mInitDone = false; private AuditProviderFactory() { @@ -79,11 +80,11 @@ public class AuditProviderFactory { return sFactory; } - public static AuditProvider getAuditProvider() { + public static AuditHandler getAuditProvider() { return AuditProviderFactory.getInstance().getProvider(); } - public AuditProvider getProvider() { + public AuditHandler getProvider() { return mProvider; } @@ -118,7 +119,7 @@ public class AuditProviderFactory { boolean isAuditToSolrEnabled = MiscUtil.getBooleanProperty(props, AUDIT_SOLR_IS_ENABLED_PROP, false); - List<AuditProvider> providers = new ArrayList<AuditProvider>(); + List<AuditHandler> providers = new ArrayList<AuditHandler>(); // TODO: Delete me for (Object propNameObj : props.keySet()) { @@ -150,17 +151,16 @@ public class AuditProviderFactory { for (String destName : destNameList) { String destPropPrefix = AUDIT_DEST_BASE + "." + destName; - AuditProvider destProvider = getProviderFromConfig(props, - destPropPrefix, destName); + AuditHandler destProvider = getProviderFromConfig(props, + destPropPrefix, destName, null); if (destProvider != null) { destProvider.init(props, destPropPrefix); String queueName = MiscUtil.getStringProperty(props, - destPropPrefix + "." + BaseAuditProvider.PROP_QUEUE); + destPropPrefix + "." + AuditQueue.PROP_QUEUE); if (queueName == null || queueName.isEmpty()) { - LOG.info(destPropPrefix + "." - + BaseAuditProvider.PROP_QUEUE + LOG.info(destPropPrefix + "." + AuditQueue.PROP_QUEUE + " is not set. Setting queue to batch for " + destName); queueName = "batch"; @@ -169,16 +169,15 @@ public class AuditProviderFactory { if (queueName != null && !queueName.isEmpty() && !queueName.equalsIgnoreCase("none")) { String queuePropPrefix = destPropPrefix + "." + queueName; - AuditProvider queueProvider = getProviderFromConfig(props, - queuePropPrefix, queueName); + AuditHandler queueProvider = getProviderFromConfig(props, + queuePropPrefix, queueName, destProvider); if (queueProvider != null) { - if (queueProvider instanceof BaseAuditProvider) { - BaseAuditProvider qProvider = (BaseAuditProvider) queueProvider; - qProvider.setConsumer(destProvider); + if (queueProvider instanceof AuditQueue) { + AuditQueue qProvider = (AuditQueue) queueProvider; qProvider.init(props, queuePropPrefix); providers.add(queueProvider); } else { - LOG.fatal("Provider queue doesn't extend BaseAuditProvider destination " + LOG.fatal("Provider queue doesn't extend AuditQueue. Destination=" + destName + " can't be created. queueName=" + queueName); @@ -196,51 +195,51 @@ public class AuditProviderFactory { } if (providers.size() > 0) { LOG.info("Using v3 audit configuration"); - AuditAsyncQueue asyncQueue = new AuditAsyncQueue(); - String propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX + "." - + "async"; - asyncQueue.init(props, propPrefix); + AuditHandler consumer = providers.get(0); + + // Possible pipeline is: + // async_queue -> summary_queue -> multidestination -> batch_queue + // -> hdfs_destination + // -> batch_queue -> solr_destination + // -> batch_queue -> kafka_destination + // Above, up to multidestination, the providers are same, then it + // branches out in parallel. + + // Set the providers in the reverse order e.g. + + if (providers.size() > 1) { + // If there are more than one destination, then we need multi + // destination to process it in parallel + LOG.info("MultiDestAuditProvider is used. Destination count=" + + providers.size()); + MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider(); + multiDestProvider.init(props); + multiDestProvider.addAuditProviders(providers); + consumer = multiDestProvider; + } - propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX; + // Let's see if Summary is enabled, then summarize before sending it + // downstream + String propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX; boolean summaryEnabled = MiscUtil.getBooleanProperty(props, propPrefix + "." + "summary" + "." + "enabled", false); AuditSummaryQueue summaryQueue = null; if (summaryEnabled) { LOG.info("AuditSummaryQueue is enabled"); - summaryQueue = new AuditSummaryQueue(); + summaryQueue = new AuditSummaryQueue(consumer); summaryQueue.init(props, propPrefix); - asyncQueue.setConsumer(summaryQueue); + consumer = summaryQueue; } else { LOG.info("AuditSummaryQueue is disabled"); } - if (providers.size() == 1) { - if (summaryEnabled) { - LOG.info("Setting " + providers.get(0).getName() - + " as consumer to AuditSummaryQueue"); - summaryQueue.setConsumer(providers.get(0)); - } else { - LOG.info("Setting " + providers.get(0).getName() - + " as consumer to " + asyncQueue.getName()); - asyncQueue.setConsumer(providers.get(0)); - } - } else { - MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider(); - multiDestProvider.init(props); - multiDestProvider.addAuditProviders(providers); - if (summaryEnabled) { - LOG.info("Setting " + multiDestProvider.getName() - + " as consumer to AuditSummaryQueue"); - summaryQueue.setConsumer(multiDestProvider); - } else { - LOG.info("Setting " + multiDestProvider.getName() - + " as consumer to " + asyncQueue.getName()); - asyncQueue.setConsumer(multiDestProvider); - } - } + // Create the AsysnQueue + AuditAsyncQueue asyncQueue = new AuditAsyncQueue(consumer); + propPrefix = BaseAuditHandler.PROP_DEFAULT_PREFIX + "." + "async"; + asyncQueue.init(props, propPrefix); mProvider = asyncQueue; - LOG.info("Starting " + mProvider.getName()); + LOG.info("Starting audit queue " + mProvider.getName()); mProvider.start(); } else { LOG.info("No v3 audit configuration found. Trying v2 audit configurations"); @@ -315,9 +314,7 @@ public class AuditProviderFactory { if (kafkaProvider.isAsync()) { AsyncAuditProvider asyncProvider = new AsyncAuditProvider( - "MyKafkaAuditProvider", - kafkaProvider.getMaxQueueSize(), - kafkaProvider.getMaxBatchInterval(), kafkaProvider); + "MyKafkaAuditProvider", 1000, 1000, kafkaProvider); providers.add(asyncProvider); } else { providers.add(kafkaProvider); @@ -331,9 +328,7 @@ public class AuditProviderFactory { if (solrProvider.isAsync()) { AsyncAuditProvider asyncProvider = new AsyncAuditProvider( - "MySolrAuditProvider", - solrProvider.getMaxQueueSize(), - solrProvider.getMaxBatchInterval(), solrProvider); + "MySolrAuditProvider", 1000, 1000, solrProvider); providers.add(asyncProvider); } else { providers.add(solrProvider); @@ -387,18 +382,26 @@ public class AuditProviderFactory { Runtime.getRuntime().addShutdownHook(jvmShutdownHook); } - private AuditProvider getProviderFromConfig(Properties props, - String propPrefix, String providerName) { - AuditProvider provider = null; + private AuditHandler getProviderFromConfig(Properties props, + String propPrefix, String providerName, AuditHandler consumer) { + AuditHandler provider = null; String className = MiscUtil.getStringProperty(props, propPrefix + "." - + BaseAuditProvider.PROP_CLASS_NAME); + + BaseAuditHandler.PROP_CLASS_NAME); if (className != null && !className.isEmpty()) { try { - provider = (AuditProvider) Class.forName(className) - .newInstance(); + Class<?> handlerClass = Class.forName(className); + if (handlerClass.isAssignableFrom(AuditQueue.class)) { + // Queue class needs consumer + handlerClass.getDeclaredConstructor(AuditHandler.class) + .newInstance(consumer); + } else { + provider = (AuditHandler) Class.forName(className) + .newInstance(); + } } catch (Exception e) { LOG.fatal("Can't instantiate audit class for providerName=" - + providerName + ", className=" + className, e); + + providerName + ", className=" + className + + ", propertyPrefix=" + propPrefix, e); } } else { if (providerName.equals("file")) { @@ -414,25 +417,32 @@ public class AuditProviderFactory { } else if (providerName.equals("log4j")) { provider = new Log4jAuditProvider(); } else if (providerName.equals("batch")) { - provider = new AuditBatchQueue(); + provider = new AuditBatchQueue(consumer); } else if (providerName.equals("async")) { - provider = new AuditAsyncQueue(); + provider = new AuditAsyncQueue(consumer); } else { LOG.error("Provider name doesn't have any class associated with it. providerName=" - + providerName); + + providerName + ", propertyPrefix=" + propPrefix); + } + } + if (provider != null && provider instanceof AuditQueue) { + if (consumer == null) { + LOG.fatal("consumer can't be null for AuditQueue. queue=" + + provider.getName() + ", propertyPrefix=" + propPrefix); + provider = null; } } return provider; } - private AuditProvider getDefaultProvider() { + private AuditHandler getDefaultProvider() { return new DummyAuditProvider(); } private static class JVMShutdownHook extends Thread { - AuditProvider mProvider; + AuditHandler mProvider; - public JVMShutdownHook(AuditProvider provider) { + public JVMShutdownHook(AuditHandler provider) { mProvider = provider; } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java new file mode 100644 index 0000000..601650e --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditHandler.java @@ -0,0 +1,271 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ranger.audit.provider; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.AuthzAuditEvent; +import com.google.gson.GsonBuilder; + +import java.util.concurrent.atomic.AtomicLong; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Properties; + +public abstract class BaseAuditHandler implements AuditHandler { + private static final Log LOG = LogFactory.getLog(BaseAuditHandler.class); + + private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms"; + + private int mLogFailureReportMinIntervalInMs = 60 * 1000; + + private AtomicLong mFailedLogLastReportTime = new AtomicLong(0); + private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0); + private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0); + + public static final String PROP_NAME = "name"; + public static final String PROP_CLASS_NAME = "classname"; + + public static final String PROP_DEFAULT_PREFIX = "xasecure.audit.provider"; + + protected String propPrefix = PROP_DEFAULT_PREFIX; + + protected String providerName = null; + + protected int failedRetryTimes = 3; + protected int failedRetrySleep = 3 * 1000; + + int errorLogIntervalMS = 30 * 1000; // Every 30 seconds + long lastErrorLogMS = 0; + + protected Properties props = null; + + @Override + public void init(Properties props) { + init(props, null); + } + + @Override + public void init(Properties props, String basePropertyName) { + LOG.info("BaseAuditProvider.init()"); + this.props = props; + if (basePropertyName != null) { + propPrefix = basePropertyName; + } + LOG.info("propPrefix=" + propPrefix); + // Get final token + List<String> tokens = MiscUtil.toArray(propPrefix, "."); + String finalToken = tokens.get(tokens.size() - 1); + + String name = MiscUtil.getStringProperty(props, basePropertyName + "." + + PROP_NAME); + if (name != null && !name.isEmpty()) { + providerName = name; + } + if (providerName == null) { + providerName = finalToken; + LOG.info("Using providerName from property prefix. providerName=" + + providerName); + } + LOG.info("providerName=" + providerName); + + try { + new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create(); + } catch (Throwable excp) { + LOG.warn( + "Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json", + excp); + } + + mLogFailureReportMinIntervalInMs = MiscUtil.getIntProperty(props, + AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000); + + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. + * audit.model.AuditEventBase) + */ + @Override + public boolean log(AuditEventBase event) { + List<AuditEventBase> eventList = new ArrayList<AuditEventBase>(); + eventList.add(event); + return log(eventList); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String) + */ + @Override + public boolean logJSON(String event) { + AuditEventBase eventObj = MiscUtil.fromJson(event, + AuthzAuditEvent.class); + return log(eventObj); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection + * ) + */ + @Override + public boolean logJSON(Collection<String> events) { + boolean ret = true; + for (String event : events) { + ret = logJSON(event); + if (!ret) { + break; + } + } + return ret; + } + + public void setName(String name) { + providerName = name; + } + + @Override + public String getName() { + return providerName; + } + + public void logFailedEvent(AuditEventBase event) { + logFailedEvent(event, null); + } + + public void logError(String msg) { + long currTimeMS = System.currentTimeMillis(); + if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) { + LOG.error(msg); + lastErrorLogMS = currTimeMS; + } + } + + public void logError(String msg, Throwable ex) { + long currTimeMS = System.currentTimeMillis(); + if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) { + LOG.error(msg, ex); + lastErrorLogMS = currTimeMS; + } + } + + public String getTimeDiffStr(long time1, long time2) { + long timeInMs = Math.abs(time1 - time2); + return formatIntervalForLog(timeInMs); + } + + public String formatIntervalForLog(long timeInMs) { + long hours = timeInMs / (60 * 60 * 1000); + long minutes = (timeInMs / (60 * 1000)) % 60; + long seconds = (timeInMs % (60 * 1000)) / 1000; + long mSeconds = (timeInMs % (1000)); + + if (hours > 0) + return String.format("%02d:%02d:%02d.%03d hours", hours, minutes, + seconds, mSeconds); + else if (minutes > 0) + return String.format("%02d:%02d.%03d minutes", minutes, seconds, + mSeconds); + else if (seconds > 0) + return String.format("%02d.%03d seconds", seconds, mSeconds); + else + return String.format("%03d milli-seconds", mSeconds); + } + + public void logFailedEvent(AuditEventBase event, Throwable excp) { + long now = System.currentTimeMillis(); + + long timeSinceLastReport = now - mFailedLogLastReportTime.get(); + long countSinceLastReport = mFailedLogCountSinceLastReport + .incrementAndGet(); + long countLifeTime = mFailedLogCountLifeTime.incrementAndGet(); + + if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) { + mFailedLogLastReportTime.set(now); + mFailedLogCountSinceLastReport.set(0); + + if (excp != null) { + LOG.warn( + "failed to log audit event: " + + MiscUtil.stringify(event), excp); + } else { + LOG.warn("failed to log audit event: " + + MiscUtil.stringify(event)); + } + + if (countLifeTime > 1) { // no stats to print for the 1st failure + LOG.warn("Log failure count: " + countSinceLastReport + + " in past " + + formatIntervalForLog(timeSinceLastReport) + "; " + + countLifeTime + " during process lifetime"); + } + } + } + + public void logFailedEvent(Collection<AuditEventBase> events, Throwable excp) { + for (AuditEventBase event : events) { + logFailedEvent(event, excp); + } + } + + public void logFailedEventJSON(String event, Throwable excp) { + long now = System.currentTimeMillis(); + + long timeSinceLastReport = now - mFailedLogLastReportTime.get(); + long countSinceLastReport = mFailedLogCountSinceLastReport + .incrementAndGet(); + long countLifeTime = mFailedLogCountLifeTime.incrementAndGet(); + + if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) { + mFailedLogLastReportTime.set(now); + mFailedLogCountSinceLastReport.set(0); + + if (excp != null) { + LOG.warn("failed to log audit event: " + event, excp); + } else { + LOG.warn("failed to log audit event: " + event); + } + + if (countLifeTime > 1) { // no stats to print for the 1st failure + LOG.warn("Log failure count: " + countSinceLastReport + + " in past " + + formatIntervalForLog(timeSinceLastReport) + "; " + + countLifeTime + " during process lifetime"); + } + } + } + + public void logFailedEventJSON(Collection<String> events, Throwable excp) { + for (String event : events) { + logFailedEventJSON(event, excp); + } + } + + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java deleted file mode 100644 index 85c207b..0000000 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java +++ /dev/null @@ -1,432 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ranger.audit.provider; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.audit.model.AuditEventBase; -import org.apache.ranger.audit.model.AuthzAuditEvent; -import org.apache.ranger.audit.queue.AuditFileSpool; - -import com.google.gson.GsonBuilder; - -import java.util.concurrent.atomic.AtomicLong; -import java.util.ArrayList; -import java.util.Collection; -import java.util.List; -import java.util.Properties; - -public abstract class BaseAuditProvider implements AuditProvider { - private static final Log LOG = LogFactory.getLog(BaseAuditProvider.class); - - private static final String AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP = "xasecure.audit.log.failure.report.min.interval.ms"; - public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024; - public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000; - public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000; - - private AtomicLong lifeTimeInLogCount = new AtomicLong(0); - - private int mLogFailureReportMinIntervalInMs = 60 * 1000; - - private AtomicLong mFailedLogLastReportTime = new AtomicLong(0); - private AtomicLong mFailedLogCountSinceLastReport = new AtomicLong(0); - private AtomicLong mFailedLogCountLifeTime = new AtomicLong(0); - - public static final String PROP_NAME = "name"; - public static final String PROP_CLASS_NAME = "classname"; - public static final String PROP_QUEUE = "queue"; - - public static final String PROP_BATCH_SIZE = "batch.size"; - public static final String PROP_QUEUE_SIZE = "queue.size"; - public static final String PROP_BATCH_INTERVAL = "batch.interval.ms"; - - public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable"; - public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms"; - public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent"; - - public static final String PROP_DEFAULT_PREFIX = "xasecure.audit.provider"; - - private boolean isDrain = false; - private String providerName = null; - - private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT; - private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS; - private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT; - - protected int failedRetryTimes = 3; - protected int failedRetrySleep = 3 * 1000; - - protected AuditProvider consumer = null; - protected AuditFileSpool fileSpooler = null; - - protected boolean fileSpoolerEnabled = false; - protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes - protected int fileSpoolDrainThresholdPercent = 80; - - int errorLogIntervalMS = 30 * 1000; // Every 30 seconds - long lastErrorLogMS = 0; - - protected Properties props = null; - - public BaseAuditProvider() { - } - - public BaseAuditProvider(AuditProvider consumer) { - this.consumer = consumer; - } - - @Override - public void init(Properties props) { - init(props, null); - } - - @Override - public void init(Properties props, String basePropertyName) { - LOG.info("BaseAuditProvider.init()"); - this.props = props; - String propPrefix = PROP_DEFAULT_PREFIX; - if (basePropertyName != null) { - propPrefix = basePropertyName; - } - LOG.info("propPrefix=" + propPrefix); - // Get final token - List<String> tokens = MiscUtil.toArray(propPrefix, "."); - String finalToken = tokens.get(tokens.size() - 1); - - String name = MiscUtil.getStringProperty(props, basePropertyName + "." - + PROP_NAME); - if (name != null && !name.isEmpty()) { - providerName = name; - } - if (providerName == null) { - providerName = finalToken; - LOG.info("Using providerName from property prefix. providerName=" - + providerName); - } - LOG.info("providerName=" + providerName); - - setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_BATCH_SIZE, getMaxBatchSize())); - setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_QUEUE_SIZE, getMaxQueueSize())); - setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_BATCH_INTERVAL, getMaxBatchInterval())); - - fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix - + "." + PROP_FILE_SPOOL_ENABLE, false); - String logFolderProp = MiscUtil.getStringProperty(props, propPrefix - + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR); - if (fileSpoolerEnabled || logFolderProp != null) { - LOG.info("File spool is enabled for " + getName() - + ", logFolderProp=" + logFolderProp + ", " + propPrefix - + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "=" - + fileSpoolerEnabled); - fileSpoolerEnabled = true; - fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix - + "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN, - fileSpoolMaxWaitTime); - fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props, - propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD, - fileSpoolDrainThresholdPercent); - fileSpooler = new AuditFileSpool(this, consumer); - fileSpooler.init(props, basePropertyName); - } else { - LOG.info("File spool is disabled for " + getName()); - } - - try { - new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create(); - } catch (Throwable excp) { - LOG.warn( - "Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json", - excp); - } - - mLogFailureReportMinIntervalInMs = MiscUtil.getIntProperty(props, - AUDIT_LOG_FAILURE_REPORT_MIN_INTERVAL_PROP, 60 * 1000); - - } - - public AuditProvider getConsumer() { - return consumer; - } - - public void setConsumer(AuditProvider consumer) { - this.consumer = consumer; - } - - public void logFailedEvent(AuditEventBase event) { - logFailedEvent(event, null); - } - - public void logFailedEvent(AuditEventBase event, Throwable excp) { - long now = System.currentTimeMillis(); - - long timeSinceLastReport = now - mFailedLogLastReportTime.get(); - long countSinceLastReport = mFailedLogCountSinceLastReport - .incrementAndGet(); - long countLifeTime = mFailedLogCountLifeTime.incrementAndGet(); - - if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) { - mFailedLogLastReportTime.set(now); - mFailedLogCountSinceLastReport.set(0); - - if (excp != null) { - LOG.warn( - "failed to log audit event: " - + MiscUtil.stringify(event), excp); - } else { - LOG.warn("failed to log audit event: " - + MiscUtil.stringify(event)); - } - - if (countLifeTime > 1) { // no stats to print for the 1st failure - LOG.warn("Log failure count: " + countSinceLastReport - + " in past " - + formatIntervalForLog(timeSinceLastReport) + "; " - + countLifeTime + " during process lifetime"); - } - } - } - - public void logFailedEvent(Collection<AuditEventBase> events, Throwable excp) { - for (AuditEventBase event : events) { - logFailedEvent(event, excp); - } - } - - public void logFailedEventJSON(String event, Throwable excp) { - long now = System.currentTimeMillis(); - - long timeSinceLastReport = now - mFailedLogLastReportTime.get(); - long countSinceLastReport = mFailedLogCountSinceLastReport - .incrementAndGet(); - long countLifeTime = mFailedLogCountLifeTime.incrementAndGet(); - - if (timeSinceLastReport >= mLogFailureReportMinIntervalInMs) { - mFailedLogLastReportTime.set(now); - mFailedLogCountSinceLastReport.set(0); - - if (excp != null) { - LOG.warn("failed to log audit event: " + event, excp); - } else { - LOG.warn("failed to log audit event: " + event); - } - - if (countLifeTime > 1) { // no stats to print for the 1st failure - LOG.warn("Log failure count: " + countSinceLastReport - + " in past " - + formatIntervalForLog(timeSinceLastReport) + "; " - + countLifeTime + " during process lifetime"); - } - } - } - - public void logFailedEventJSON(Collection<String> events, Throwable excp) { - for (String event : events) { - logFailedEventJSON(event, excp); - } - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. - * audit.model.AuditEventBase) - */ - @Override - public boolean log(AuditEventBase event) { - List<AuditEventBase> eventList = new ArrayList<AuditEventBase>(); - eventList.add(event); - return log(eventList); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.lang.String) - */ - @Override - public boolean logJSON(String event) { - AuditEventBase eventObj = MiscUtil.fromJson(event, - AuthzAuditEvent.class); - return log(eventObj); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#logJSON(java.util.Collection - * ) - */ - @Override - public boolean logJSON(Collection<String> events) { - boolean ret = true; - for (String event : events) { - ret = logJSON(event); - if (!ret) { - break; - } - } - return ret; - } - - public void setName(String name) { - providerName = name; - } - - @Override - public String getName() { - return providerName; - } - - @Override - public boolean isDrain() { - return isDrain; - } - - public void setDrain(boolean isDrain) { - this.isDrain = isDrain; - } - - public int getMaxQueueSize() { - return maxQueueSize; - } - - public void setMaxQueueSize(int maxQueueSize) { - this.maxQueueSize = maxQueueSize; - } - - @Override - public int getMaxBatchInterval() { - return maxBatchInterval; - } - - public void setMaxBatchInterval(int maxBatchInterval) { - this.maxBatchInterval = maxBatchInterval; - } - - @Override - public int getMaxBatchSize() { - return maxBatchSize; - } - - public void setMaxBatchSize(int maxBatchSize) { - this.maxBatchSize = maxBatchSize; - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete() - */ - @Override - public void waitToComplete() { - if (consumer != null) { - consumer.waitToComplete(-1); - } - } - - @Override - public void waitToComplete(long timeout) { - if (consumer != null) { - consumer.waitToComplete(timeout); - } - } - - @Override - public boolean isFlushPending() { - return false; - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime() - */ - @Override - public long getLastFlushTime() { - if (consumer != null) { - return consumer.getLastFlushTime(); - } - return 0; - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#flush() - */ - @Override - public void flush() { - if (consumer != null) { - consumer.flush(); - } - } - - public AtomicLong getLifeTimeInLogCount() { - return lifeTimeInLogCount; - } - - public long addLifeTimeInLogCount(long count) { - return lifeTimeInLogCount.addAndGet(count); - } - - public void logError(String msg) { - long currTimeMS = System.currentTimeMillis(); - if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) { - LOG.error(msg); - lastErrorLogMS = currTimeMS; - } - } - - public void logError(String msg, Throwable ex) { - long currTimeMS = System.currentTimeMillis(); - if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) { - LOG.error(msg, ex); - lastErrorLogMS = currTimeMS; - } - } - - public String getTimeDiffStr(long time1, long time2) { - long timeInMs = Math.abs(time1 - time2); - return formatIntervalForLog(timeInMs); - } - - public String formatIntervalForLog(long timeInMs) { - long hours = timeInMs / (60 * 60 * 1000); - long minutes = (timeInMs / (60 * 1000)) % 60; - long seconds = (timeInMs % (60 * 1000)) / 1000; - long mSeconds = (timeInMs % (1000)); - - if (hours > 0) - return String.format("%02d:%02d:%02d.%03d hours", hours, minutes, - seconds, mSeconds); - else if (minutes > 0) - return String.format("%02d:%02d.%03d minutes", minutes, seconds, - mSeconds); - else if (seconds > 0) - return String.format("%02d.%03d seconds", seconds, mSeconds); - else - return String.format("%03d milli-seconds", mSeconds); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java index ab6a74a..ca842f3 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java @@ -23,7 +23,7 @@ import java.util.Properties; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; -public abstract class BufferedAuditProvider extends BaseAuditProvider { +public abstract class BufferedAuditProvider extends BaseAuditHandler { private LogBuffer<AuditEventBase> mBuffer = null; private LogDestination<AuditEventBase> mDestination = null; @@ -107,16 +107,6 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider { } @Override - public boolean isFlushPending() { - return false; - } - - @Override - public long getLastFlushTime() { - return 0; - } - - @Override public void flush() { } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java index f4bd90c..d475f89 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java @@ -177,16 +177,6 @@ public class DbAuditProvider extends AuditDestination { } @Override - public boolean isFlushPending() { - return mUncommitted.size() > 0; - } - - @Override - public long getLastFlushTime() { - return mLastCommitTime; - } - - @Override public void flush() { if(mUncommitted.size() > 0) { boolean isSuccess = commitTransaction(); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java index 619a99d..05f882f 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java @@ -24,7 +24,7 @@ import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; -public class DummyAuditProvider implements AuditProvider { +public class DummyAuditProvider implements AuditHandler { @Override public void init(Properties prop) { // intentionally left empty @@ -74,23 +74,6 @@ public class DummyAuditProvider implements AuditProvider { // intentionally left empty } - - @Override - public int getMaxBatchSize() { - // TODO Auto-generated method stub - return 0; - } - - @Override - public boolean isFlushPending() { - return false; - } - - @Override - public long getLastFlushTime() { - return 0; - } - @Override public void flush() { // intentionally left empty @@ -120,20 +103,4 @@ public class DummyAuditProvider implements AuditProvider { return this.getClass().getName(); } - /* (non-Javadoc) - * @see org.apache.ranger.audit.provider.AuditProvider#isDrain() - */ - @Override - public boolean isDrain() { - return false; - } - - /* (non-Javadoc) - * @see org.apache.ranger.audit.provider.AuditProvider#getMaxBatchInterval() - */ - @Override - public int getMaxBatchInterval() { - // TODO Auto-generated method stub - return 0; - } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java index 040a045..0402de2 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java @@ -27,8 +27,6 @@ import org.apache.ranger.audit.destination.AuditDestination; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; -import com.sun.tools.hat.internal.util.Misc; - public class Log4jAuditProvider extends AuditDestination { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java index 876fa5b..4c1593a 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java @@ -26,18 +26,18 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.model.AuditEventBase; -public class MultiDestAuditProvider extends BaseAuditProvider { +public class MultiDestAuditProvider extends BaseAuditHandler { private static final Log LOG = LogFactory .getLog(MultiDestAuditProvider.class); - protected List<AuditProvider> mProviders = new ArrayList<AuditProvider>(); + protected List<AuditHandler> mProviders = new ArrayList<AuditHandler>(); public MultiDestAuditProvider() { LOG.info("MultiDestAuditProvider: creating.."); } - public MultiDestAuditProvider(AuditProvider provider) { + public MultiDestAuditProvider(AuditHandler provider) { addAuditProvider(provider); } @@ -47,7 +47,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { super.init(props); - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.init(props); } catch (Throwable excp) { @@ -57,7 +57,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { } } - public void addAuditProvider(AuditProvider provider) { + public void addAuditProvider(AuditHandler provider) { if (provider != null) { LOG.info("MultiDestAuditProvider.addAuditProvider(providerType=" + provider.getClass().getCanonicalName() + ")"); @@ -66,9 +66,9 @@ public class MultiDestAuditProvider extends BaseAuditProvider { } } - public void addAuditProviders(List<AuditProvider> providers) { + public void addAuditProviders(List<AuditHandler> providers) { if (providers != null) { - for (AuditProvider provider : providers) { + for (AuditHandler provider : providers) { LOG.info("Adding " + provider.getName() + " as consumer to MultiDestination " + getName()); addAuditProvider(provider); @@ -78,7 +78,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { @Override public boolean log(AuditEventBase event) { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.log(event); } catch (Throwable excp) { @@ -90,7 +90,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { @Override public boolean log(Collection<AuditEventBase> events) { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.log(events); } catch (Throwable excp) { @@ -102,7 +102,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { @Override public boolean logJSON(String event) { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.logJSON(event); } catch (Throwable excp) { @@ -114,7 +114,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { @Override public boolean logJSON(Collection<String> events) { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.logJSON(events); } catch (Throwable excp) { @@ -126,7 +126,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { @Override public void start() { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.start(); } catch (Throwable excp) { @@ -138,7 +138,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { @Override public void stop() { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.stop(); } catch (Throwable excp) { @@ -150,7 +150,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { @Override public void waitToComplete() { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.waitToComplete(); } catch (Throwable excp) { @@ -163,7 +163,7 @@ public class MultiDestAuditProvider extends BaseAuditProvider { @Override public void waitToComplete(long timeout) { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.waitToComplete(timeout); } catch (Throwable excp) { @@ -175,35 +175,8 @@ public class MultiDestAuditProvider extends BaseAuditProvider { } @Override - public boolean isFlushPending() { - for (AuditProvider provider : mProviders) { - if (provider.isFlushPending()) { - return true; - } - } - - return false; - } - - @Override - public long getLastFlushTime() { - long lastFlushTime = 0; - for (AuditProvider provider : mProviders) { - long flushTime = provider.getLastFlushTime(); - - if (flushTime != 0) { - if (lastFlushTime == 0 || lastFlushTime > flushTime) { - lastFlushTime = flushTime; - } - } - } - - return lastFlushTime; - } - - @Override public void flush() { - for (AuditProvider provider : mProviders) { + for (AuditHandler provider : mProviders) { try { provider.flush(); } catch (Throwable excp) { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java index 5f39e69..2c77b40 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/kafka/KafkaAuditProvider.java @@ -25,12 +25,12 @@ import kafka.producer.ProducerConfig; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.destination.AuditDestination; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; -import org.apache.ranger.audit.provider.BaseAuditProvider; import org.apache.ranger.audit.provider.MiscUtil; -public class KafkaAuditProvider extends BaseAuditProvider { +public class KafkaAuditProvider extends AuditDestination { private static final Log LOG = LogFactory.getLog(KafkaAuditProvider.class); public static final String AUDIT_MAX_QUEUE_SIZE_PROP = "xasecure.audit.kafka.async.max.queue.size"; @@ -47,11 +47,6 @@ public class KafkaAuditProvider extends BaseAuditProvider { LOG.info("init() called"); super.init(props); - setMaxQueueSize(MiscUtil.getIntProperty(props, - AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT)); - setMaxBatchInterval(MiscUtil.getIntProperty(props, - AUDIT_MAX_QUEUE_SIZE_PROP, - AUDIT_BATCH_INTERVAL_DEFAULT_MS)); topic = MiscUtil.getStringProperty(props, AUDIT_KAFKA_TOPIC_NAME); if (topic == null || topic.isEmpty()) { @@ -176,19 +171,6 @@ public class KafkaAuditProvider extends BaseAuditProvider { } @Override - public boolean isFlushPending() { - LOG.info("isFlushPending() called"); - return false; - } - - @Override - public long getLastFlushTime() { - LOG.info("getLastFlushTime() called"); - - return 0; - } - - @Override public void flush() { LOG.info("flush() called"); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java index 9ee4ec0..53e4348 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/solr/SolrAuditProvider.java @@ -25,16 +25,16 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.destination.AuditDestination; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; -import org.apache.ranger.audit.provider.BaseAuditProvider; import org.apache.ranger.audit.provider.MiscUtil; import org.apache.solr.client.solrj.SolrClient; import org.apache.solr.client.solrj.impl.HttpSolrClient; import org.apache.solr.client.solrj.response.UpdateResponse; import org.apache.solr.common.SolrInputDocument; -public class SolrAuditProvider extends BaseAuditProvider { +public class SolrAuditProvider extends AuditDestination { private static final Log LOG = LogFactory.getLog(SolrAuditProvider.class); public static final String AUDIT_MAX_QUEUE_SIZE_PROP = "xasecure.audit.solr.async.max.queue.size"; @@ -56,11 +56,6 @@ public class SolrAuditProvider extends BaseAuditProvider { LOG.info("init() called"); super.init(props); - setMaxQueueSize(MiscUtil.getIntProperty(props, - AUDIT_MAX_QUEUE_SIZE_PROP, AUDIT_MAX_QUEUE_SIZE_DEFAULT)); - setMaxBatchInterval(MiscUtil.getIntProperty(props, - AUDIT_MAX_QUEUE_SIZE_PROP, - AUDIT_BATCH_INTERVAL_DEFAULT_MS)); retryWaitTime = MiscUtil.getIntProperty(props, AUDIT_RETRY_WAIT_PROP, retryWaitTime); } @@ -241,29 +236,7 @@ public class SolrAuditProvider extends BaseAuditProvider { public void waitToComplete(long timeout) { } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() - */ - @Override - public boolean isFlushPending() { - // TODO Auto-generated method stub - return false; - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#getLastFlushTime() - */ - @Override - public long getLastFlushTime() { - // TODO Auto-generated method stub - return 0; - } - + /* * (non-Javadoc) * http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java index a6f291d..d16fff9 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java @@ -21,32 +21,27 @@ package org.apache.ranger.audit.queue; import java.util.ArrayList; import java.util.Collection; -import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.LinkedBlockingQueue; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.model.AuditEventBase; -import org.apache.ranger.audit.provider.AuditProvider; -import org.apache.ranger.audit.provider.BaseAuditProvider; +import org.apache.ranger.audit.provider.AuditHandler; /** * This is a non-blocking queue with no limit on capacity. */ -public class AuditAsyncQueue extends BaseAuditProvider implements Runnable { +public class AuditAsyncQueue extends AuditQueue implements Runnable { private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class); - LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>(); + LinkedBlockingQueue<AuditEventBase> queue = new LinkedBlockingQueue<AuditEventBase>(); Thread consumerThread = null; static final int MAX_DRAIN = 1000; static int threadCount = 0; static final String DEFAULT_NAME = "async"; - public AuditAsyncQueue() { - setName(DEFAULT_NAME); - } - - public AuditAsyncQueue(AuditProvider consumer) { + public AuditAsyncQueue(AuditHandler consumer) { super(consumer); setName(DEFAULT_NAME); } @@ -65,7 +60,6 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable { return false; } queue.add(event); - addLifeTimeInLogCount(1); return true; } @@ -90,6 +84,9 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable { public void start() { if (consumer != null) { consumer.start(); + } else { + logger.error("consumer is not set. Nothing will be sent to any consumer. name=" + + getName()); } consumerThread = new Thread(this, this.getClass().getName() @@ -110,23 +107,10 @@ public class AuditAsyncQueue extends BaseAuditProvider implements Runnable { if (consumerThread != null) { consumerThread.interrupt(); } - consumerThread = null; } catch (Throwable t) { // ignore any exception } - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() - */ - @Override - public boolean isFlushPending() { - if (queue.isEmpty()) { - return consumer.isFlushPending(); - } - return true; + consumerThread = null; } /* http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java index 5e21efc..8ed07bd 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java @@ -29,10 +29,9 @@ import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.model.AuditEventBase; -import org.apache.ranger.audit.provider.AuditProvider; -import org.apache.ranger.audit.provider.BaseAuditProvider; +import org.apache.ranger.audit.provider.AuditHandler; -public class AuditBatchQueue extends BaseAuditProvider implements Runnable { +public class AuditBatchQueue extends AuditQueue implements Runnable { private static final Log logger = LogFactory.getLog(AuditBatchQueue.class); private BlockingQueue<AuditEventBase> queue = null; @@ -41,10 +40,7 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable { Thread consumerThread = null; static int threadCount = 0; - public AuditBatchQueue() { - } - - public AuditBatchQueue(AuditProvider consumer) { + public AuditBatchQueue(AuditHandler consumer) { super(consumer); } @@ -59,7 +55,6 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable { public boolean log(AuditEventBase event) { // Add to batchQueue. Block if full queue.add(event); - addLifeTimeInLogCount(1); return true; } @@ -130,10 +125,10 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable { if (consumerThread != null) { consumerThread.interrupt(); } - consumerThread = null; } catch (Throwable t) { // ignore any exception } + consumerThread = null; } /* @@ -187,19 +182,6 @@ public class AuditBatchQueue extends BaseAuditProvider implements Runnable { /* * (non-Javadoc) * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() - */ - @Override - public boolean isFlushPending() { - if (queue.isEmpty()) { - return consumer.isFlushPending(); - } - return true; - } - - /* - * (non-Javadoc) - * * @see org.apache.ranger.audit.provider.AuditProvider#flush() */ @Override http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java index 66d1573..a1c32b9 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java @@ -35,13 +35,13 @@ import java.util.Iterator; import java.util.List; import java.util.Properties; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.model.AuditEventBase; -import org.apache.ranger.audit.provider.AuditProvider; +import org.apache.ranger.audit.provider.AuditHandler; import org.apache.ranger.audit.provider.MiscUtil; import com.google.gson.Gson; @@ -69,10 +69,10 @@ public class AuditFileSpool implements Runnable { // "filespool.index.done_filename"; public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms"; - AuditProvider queueProvider = null; - AuditProvider consumerProvider = null; + AuditQueue queueProvider = null; + AuditHandler consumerProvider = null; - BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>(); + BlockingQueue<AuditIndexRecord> indexQueue = new LinkedBlockingQueue<AuditIndexRecord>(); // Folder and File attributes File logFolder = null; @@ -108,10 +108,10 @@ public class AuditFileSpool implements Runnable { boolean isDrain = false; boolean isDestDown = true; - private static Gson gson = null; + private Gson gson = null; - public AuditFileSpool(AuditProvider queueProvider, - AuditProvider consumerProvider) { + public AuditFileSpool(AuditQueue queueProvider, + AuditHandler consumerProvider) { this.queueProvider = queueProvider; this.consumerProvider = consumerProvider; } @@ -120,12 +120,12 @@ public class AuditFileSpool implements Runnable { init(prop, null); } - public void init(Properties props, String basePropertyName) { + public boolean init(Properties props, String basePropertyName) { if (initDone) { logger.error("init() called more than once. queueProvider=" + queueProvider.getName() + ", consumerProvider=" + consumerProvider.getName()); - return; + return true; } String propPrefix = "xasecure.audit.filespool"; if (basePropertyName != null) { @@ -162,22 +162,22 @@ public class AuditFileSpool implements Runnable { + queueProvider.getName()); if (logFolderProp == null || logFolderProp.isEmpty()) { - logger.error("Audit spool folder is not configured. Please set " + logger.fatal("Audit spool folder is not configured. Please set " + propPrefix + "." + PROP_FILE_SPOOL_LOCAL_DIR + ". queueName=" + queueProvider.getName()); - return; + return false; } logFolder = new File(logFolderProp); if (!logFolder.isDirectory()) { logFolder.mkdirs(); if (!logFolder.isDirectory()) { - logger.error("File Spool folder not found and can't be created. folder=" + logger.fatal("File Spool folder not found and can't be created. folder=" + logFolder.getAbsolutePath() + ", queueName=" + queueProvider.getName()); - return; + return false; } } logger.info("logFolder=" + logFolder + ", queueName=" @@ -202,7 +202,7 @@ public class AuditFileSpool implements Runnable { + archiveFolder.getAbsolutePath() + ", queueName=" + queueProvider.getName()); - return; + return false; } } logger.info("archiveFolder=" + archiveFolder + ", queueName=" @@ -218,17 +218,30 @@ public class AuditFileSpool implements Runnable { indexFile = new File(logFolder, indexFileName); if (!indexFile.exists()) { - indexFile.createNewFile(); + boolean ret = indexFile.createNewFile(); + if (!ret) { + logger.fatal("Error creating index file. fileName=" + + indexDoneFile.getPath()); + return false; + } } logger.info("indexFile=" + indexFile + ", queueName=" + queueProvider.getName()); int lastDot = indexFileName.lastIndexOf('.'); + if (lastDot < 0) { + lastDot = indexFileName.length() - 1; + } indexDoneFileName = indexFileName.substring(0, lastDot) + "_closed.json"; indexDoneFile = new File(logFolder, indexDoneFileName); if (!indexDoneFile.exists()) { - indexDoneFile.createNewFile(); + boolean ret = indexDoneFile.createNewFile(); + if (!ret) { + logger.fatal("Error creating index done file. fileName=" + + indexDoneFile.getPath()); + return false; + } } logger.info("indexDoneFile=" + indexDoneFile + ", queueName=" + queueProvider.getName()); @@ -252,8 +265,6 @@ public class AuditFileSpool implements Runnable { } } printIndex(); - // One more loop to add the rest of the pending records in reverse - // order for (int i = 0; i < indexRecords.size(); i++) { AuditIndexRecord auditIndexRecord = indexRecords.get(i); if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) { @@ -261,18 +272,19 @@ public class AuditFileSpool implements Runnable { if (!consumerFile.exists()) { logger.error("INIT: Consumer file=" + consumerFile.getPath() + " not found."); - System.exit(1); + } else { + indexQueue.add(auditIndexRecord); } - indexQueue.add(auditIndexRecord); } } } catch (Throwable t) { logger.fatal("Error initializing File Spooler. queue=" + queueProvider.getName(), t); - return; + return false; } initDone = true; + return true; } /** @@ -328,6 +340,7 @@ public class AuditFileSpool implements Runnable { out.flush(); out.close(); + break; } catch (Throwable t) { logger.debug("Error closing spool out file.", t); } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/4f3cea22/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java new file mode 100644 index 0000000..4c3ac5f --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditQueue.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.provider.AuditHandler; +import org.apache.ranger.audit.provider.BaseAuditHandler; +import org.apache.ranger.audit.provider.MiscUtil; + +public abstract class AuditQueue extends BaseAuditHandler { + private static final Log LOG = LogFactory.getLog(AuditQueue.class); + + public static final int AUDIT_MAX_QUEUE_SIZE_DEFAULT = 1024 * 1024; + public static final int AUDIT_BATCH_INTERVAL_DEFAULT_MS = 1000; + public static final int AUDIT_BATCH_SIZE_DEFAULT = 1000; + + private int maxQueueSize = AUDIT_MAX_QUEUE_SIZE_DEFAULT; + private int maxBatchInterval = AUDIT_BATCH_INTERVAL_DEFAULT_MS; + private int maxBatchSize = AUDIT_BATCH_SIZE_DEFAULT; + + public static final String PROP_QUEUE = "queue"; + + public static final String PROP_BATCH_SIZE = "batch.size"; + public static final String PROP_QUEUE_SIZE = "queue.size"; + public static final String PROP_BATCH_INTERVAL = "batch.interval.ms"; + + public static final String PROP_FILE_SPOOL_ENABLE = "filespool.enable"; + public static final String PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN = "filespool.drain.full.wait.ms"; + public static final String PROP_FILE_SPOOL_QUEUE_THRESHOLD = "filespool.drain.threshold.percent"; + + final protected AuditHandler consumer; + protected AuditFileSpool fileSpooler = null; + + private boolean isDrain = false; + + protected boolean fileSpoolerEnabled = false; + protected int fileSpoolMaxWaitTime = 5 * 60 * 1000; // Default 5 minutes + protected int fileSpoolDrainThresholdPercent = 80; + + /** + * @param consumer + */ + public AuditQueue(AuditHandler consumer) { + this.consumer = consumer; + } + + @Override + public void init(Properties props, String basePropertyName) { + LOG.info("BaseAuditProvider.init()"); + super.init(props, basePropertyName); + + setMaxBatchSize(MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_BATCH_SIZE, getMaxBatchSize())); + setMaxQueueSize(MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_QUEUE_SIZE, getMaxQueueSize())); + setMaxBatchInterval(MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_BATCH_INTERVAL, getMaxBatchInterval())); + + fileSpoolerEnabled = MiscUtil.getBooleanProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_ENABLE, false); + String logFolderProp = MiscUtil.getStringProperty(props, propPrefix + + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR); + if (fileSpoolerEnabled || logFolderProp != null) { + LOG.info("File spool is enabled for " + getName() + + ", logFolderProp=" + logFolderProp + ", " + propPrefix + + "." + AuditFileSpool.PROP_FILE_SPOOL_LOCAL_DIR + "=" + + fileSpoolerEnabled); + fileSpoolerEnabled = true; + fileSpoolMaxWaitTime = MiscUtil.getIntProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_WAIT_FOR_FULL_DRAIN, + fileSpoolMaxWaitTime); + fileSpoolDrainThresholdPercent = MiscUtil.getIntProperty(props, + propPrefix + "." + PROP_FILE_SPOOL_QUEUE_THRESHOLD, + fileSpoolDrainThresholdPercent); + fileSpooler = new AuditFileSpool(this, consumer); + if (!fileSpooler.init(props, basePropertyName)) { + fileSpoolerEnabled = false; + LOG.fatal("Couldn't initialize file spooler. Disabling it. queue=" + + getName() + ", consumer=" + consumer.getName()); + } + } else { + LOG.info("File spool is disabled for " + getName()); + } + + } + + public AuditHandler getConsumer() { + return consumer; + } + + public boolean isDrain() { + return isDrain; + } + + public void setDrain(boolean isDrain) { + this.isDrain = isDrain; + } + + public int getMaxQueueSize() { + return maxQueueSize; + } + + public void setMaxQueueSize(int maxQueueSize) { + this.maxQueueSize = maxQueueSize; + } + + public int getMaxBatchInterval() { + return maxBatchInterval; + } + + public void setMaxBatchInterval(int maxBatchInterval) { + this.maxBatchInterval = maxBatchInterval; + } + + public int getMaxBatchSize() { + return maxBatchSize; + } + + public void setMaxBatchSize(int maxBatchSize) { + this.maxBatchSize = maxBatchSize; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete() + */ + @Override + public void waitToComplete() { + if (consumer != null) { + consumer.waitToComplete(-1); + } + } + + @Override + public void waitToComplete(long timeout) { + if (consumer != null) { + consumer.waitToComplete(timeout); + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#flush() + */ + @Override + public void flush() { + if (consumer != null) { + consumer.flush(); + } + } + +}
