http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java index ec5e9a8..ab6a74a 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BufferedAuditProvider.java @@ -24,7 +24,7 @@ import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; public abstract class BufferedAuditProvider extends BaseAuditProvider { - private LogBuffer<AuditEventBase> mBuffer = null; + private LogBuffer<AuditEventBase> mBuffer = null; private LogDestination<AuditEventBase> mDestination = null; @Override @@ -34,34 +34,39 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider { @Override public boolean log(AuditEventBase event) { - if(event instanceof AuthzAuditEvent) { - AuthzAuditEvent authzEvent = (AuthzAuditEvent)event; + if (event instanceof AuthzAuditEvent) { + AuthzAuditEvent authzEvent = (AuthzAuditEvent) event; - if(authzEvent.getAgentHostname() == null) { + if (authzEvent.getAgentHostname() == null) { authzEvent.setAgentHostname(MiscUtil.getHostname()); } - if(authzEvent.getLogType() == null) { + if (authzEvent.getLogType() == null) { authzEvent.setLogType("RangerAudit"); } - if(authzEvent.getEventId() == null) { + if (authzEvent.getEventId() == null) { authzEvent.setEventId(MiscUtil.generateUniqueId()); } } - if(! mBuffer.add(event)) { + if (!mBuffer.add(event)) { logFailedEvent(event); + return false; } return true; } @Override public boolean log(Collection<AuditEventBase> events) { + boolean ret = true; for (AuditEventBase event : events) { - log(event); + ret = log(event); + if (!ret) { + break; + } } - return true; + return ret; } @Override @@ -73,8 +78,12 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider { @Override public boolean logJSON(Collection<String> events) { + boolean ret = true; for (String event : events) { - logJSON(event); + ret = logJSON(event); + if (!ret) { + break; + } } return false; } @@ -93,7 +102,6 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider { public void waitToComplete() { } - @Override public void waitToComplete(long timeout) { } @@ -120,9 +128,9 @@ public abstract class BufferedAuditProvider extends BaseAuditProvider { return mDestination; } - protected void setBufferAndDestination(LogBuffer<AuditEventBase> buffer, - LogDestination<AuditEventBase> destination) { - mBuffer = buffer; + protected void setBufferAndDestination(LogBuffer<AuditEventBase> buffer, + LogDestination<AuditEventBase> destination) { + mBuffer = buffer; mDestination = destination; } }
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java index f4976fb..f4bd90c 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java @@ -31,6 +31,7 @@ import javax.persistence.Persistence; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.dao.DaoManager; +import org.apache.ranger.audit.destination.AuditDestination; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider; @@ -120,10 +121,14 @@ public class DbAuditProvider extends AuditDestination { @Override public boolean log(Collection<AuditEventBase> events) { + boolean ret = true; for (AuditEventBase event : events) { - log(event); + ret = log(event); + if(!ret) { + break; + } } - return true; + return ret; } @Override @@ -135,10 +140,14 @@ public class DbAuditProvider extends AuditDestination { @Override public boolean logJSON(Collection<String> events) { + boolean ret = true; for (String event : events) { - logJSON(event); + ret = logJSON(event); + if( !ret ) { + break; + } } - return false; + return ret; } @Override http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java deleted file mode 100644 index 62ecab1..0000000 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/FileAuditDestination.java +++ /dev/null @@ -1,230 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.audit.provider; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.audit.model.AuditEventBase; - -/** - * This class write the logs to local file - */ -public class FileAuditDestination extends AuditDestination { - private static final Log logger = LogFactory - .getLog(FileAuditDestination.class); - - public static final String PROP_FILE_LOCAL_DIR = "dir"; - public static final String PROP_FILE_LOCAL_FILE_NAME_FORMAT = "filename.format"; - public static final String PROP_FILE_FILE_ROLLOVER = "file.rollover.sec"; - - String baseFolder = null; - String fileFormat = null; - int fileRolloverSec = 24 * 60 * 60; // In seconds - private String logFileNameFormat; - - boolean initDone = false; - - private File logFolder; - PrintWriter logWriter = null; - - private Date fileCreateTime = null; - - private String currentFileName; - - private boolean isStopped = false; - - @Override - public void init(Properties prop, String propPrefix) { - super.init(prop, propPrefix); - - // Initialize properties for this class - // Initial folder and file properties - String logFolderProp = MiscUtil.getStringProperty(props, propPrefix - + "." + PROP_FILE_LOCAL_DIR); - logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "." - + PROP_FILE_LOCAL_FILE_NAME_FORMAT); - fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_FILE_FILE_ROLLOVER, fileRolloverSec); - - if (logFolderProp == null || logFolderProp.isEmpty()) { - logger.error("File destination folder is not configured. Please set " - + propPrefix - + "." - + PROP_FILE_LOCAL_DIR - + ". name=" - + getName()); - return; - } - logFolder = new File(logFolderProp); - if (!logFolder.isDirectory()) { - logFolder.mkdirs(); - if (!logFolder.isDirectory()) { - logger.error("FileDestination folder not found and can't be created. folder=" - + logFolder.getAbsolutePath() + ", name=" + getName()); - return; - } - } - logger.info("logFolder=" + logFolder + ", name=" + getName()); - - if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { - logFileNameFormat = "%app-type%_ranger_audit.log"; - } - - logger.info("logFileNameFormat=" + logFileNameFormat + ", destName=" - + getName()); - - initDone = true; - } - - @Override - public boolean logJSON(Collection<String> events) { - try { - PrintWriter out = getLogFileStream(); - for (String event : events) { - out.println(event); - } - out.flush(); - } catch (Throwable t) { - logError("Error writing to log file.", t); - return false; - } - return true; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection) - */ - @Override - synchronized public boolean log(Collection<AuditEventBase> events) { - if (isStopped) { - logError("log() called after stop was requested. name=" + getName()); - return false; - } - List<String> jsonList = new ArrayList<String>(); - for (AuditEventBase event : events) { - try { - jsonList.add(MiscUtil.stringify(event)); - } catch (Throwable t) { - logger.error("Error converting to JSON. event=" + event); - } - } - return logJSON(jsonList); - - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#start() - */ - @Override - public void start() { - // Nothing to do here. We will open the file when the first log request - // comes - } - - @Override - synchronized public void stop() { - if (logWriter != null) { - logWriter.flush(); - logWriter.close(); - logWriter = null; - isStopped = true; - } - } - - // Helper methods in this class - synchronized private PrintWriter getLogFileStream() throws Exception { - closeFileIfNeeded(); - - // Either there are no open log file or the previous one has been rolled - // over - if (logWriter == null) { - Date currentTime = new Date(); - // Create a new file - String fileName = MiscUtil.replaceTokens(logFileNameFormat, - currentTime.getTime()); - File outLogFile = new File(logFolder, fileName); - if (outLogFile.exists()) { - // Let's try to get the next available file - int i = 0; - while (true) { - i++; - int lastDot = fileName.lastIndexOf('.'); - String baseName = fileName.substring(0, lastDot); - String extension = fileName.substring(lastDot); - String newFileName = baseName + "." + i + extension; - File newLogFile = new File(logFolder, newFileName); - if (!newLogFile.exists()) { - // Move the file - if (!outLogFile.renameTo(newLogFile)) { - logger.error("Error renameing file. " + outLogFile - + " to " + newLogFile); - } - break; - } - } - } - if (!outLogFile.exists()) { - logger.info("Creating new file. destName=" + getName() - + ", fileName=" + fileName); - // Open the file - logWriter = new PrintWriter(new BufferedWriter(new FileWriter( - outLogFile))); - } else { - logWriter = new PrintWriter(new BufferedWriter(new FileWriter( - outLogFile, true))); - } - fileCreateTime = new Date(); - currentFileName = outLogFile.getPath(); - } - return logWriter; - } - - private void closeFileIfNeeded() throws FileNotFoundException, IOException { - if (logWriter == null) { - return; - } - if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) { - logger.info("Closing file. Rolling over. name=" + getName() - + ", fileName=" + currentFileName); - logWriter.flush(); - logWriter.close(); - logWriter = null; - currentFileName = null; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java deleted file mode 100644 index a36c40f..0000000 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/HDFSAuditDestination.java +++ /dev/null @@ -1,243 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.audit.provider; - -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.PrintWriter; -import java.net.URI; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.ranger.audit.model.AuditEventBase; - -/** - * This class write the logs to local file - */ -public class HDFSAuditDestination extends AuditDestination { - private static final Log logger = LogFactory - .getLog(HDFSAuditDestination.class); - - public static final String PROP_HDFS_DIR = "dir"; - public static final String PROP_HDFS_SUBDIR = "subdir"; - public static final String PROP_HDFS_FILE_NAME_FORMAT = "filename.format"; - public static final String PROP_HDFS_ROLLOVER = "file.rollover.sec"; - - String baseFolder = null; - String fileFormat = null; - int fileRolloverSec = 24 * 60 * 60; // In seconds - private String logFileNameFormat; - - boolean initDone = false; - - private String logFolder; - PrintWriter logWriter = null; - - private Date fileCreateTime = null; - - private String currentFileName; - - private boolean isStopped = false; - - @Override - public void init(Properties prop, String propPrefix) { - super.init(prop, propPrefix); - - // Initialize properties for this class - // Initial folder and file properties - String logFolderProp = MiscUtil.getStringProperty(props, propPrefix - + "." + PROP_HDFS_DIR); - String logSubFolder = MiscUtil.getStringProperty(props, propPrefix - + "." + PROP_HDFS_SUBDIR); - if (logSubFolder == null || logSubFolder.isEmpty()) { - logSubFolder = "%app-type%/%time:yyyyMMdd%"; - } - - logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "." - + PROP_HDFS_FILE_NAME_FORMAT); - fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_HDFS_ROLLOVER, fileRolloverSec); - - if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { - logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log"; - } - - if (logFolderProp == null || logFolderProp.isEmpty()) { - logger.fatal("File destination folder is not configured. Please set " - + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName()); - return; - } - - logFolder = logFolderProp + "/" + logSubFolder; - logger.info("logFolder=" + logFolder + ", destName=" + getName()); - logger.info("logFileNameFormat=" + logFileNameFormat + ", destName=" - + getName()); - - initDone = true; - } - - @Override - public boolean logJSON(Collection<String> events) { - try { - PrintWriter out = getLogFileStream(); - for (String event : events) { - out.println(event); - } - out.flush(); - } catch (Throwable t) { - logError("Error writing to log file.", t); - return false; - } - return true; - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection) - */ - @Override - synchronized public boolean log(Collection<AuditEventBase> events) { - if (isStopped) { - logError("log() called after stop was requested. name=" + getName()); - return false; - } - List<String> jsonList = new ArrayList<String>(); - for (AuditEventBase event : events) { - try { - jsonList.add(MiscUtil.stringify(event)); - } catch (Throwable t) { - logger.error("Error converting to JSON. event=" + event); - } - } - return logJSON(jsonList); - - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#start() - */ - @Override - public void start() { - // Nothing to do here. We will open the file when the first log request - // comes - } - - @Override - synchronized public void stop() { - try { - if (logWriter != null) { - logWriter.flush(); - logWriter.close(); - logWriter = null; - isStopped = true; - } - } catch (Throwable t) { - logger.error("Error closing HDFS file.", t); - } - } - - // Helper methods in this class - synchronized private PrintWriter getLogFileStream() throws Throwable { - closeFileIfNeeded(); - - // Either there are no open log file or the previous one has been rolled - // over - if (logWriter == null) { - Date currentTime = new Date(); - // Create a new file - String fileName = MiscUtil.replaceTokens(logFileNameFormat, - currentTime.getTime()); - String parentFolder = MiscUtil.replaceTokens(logFolder, - currentTime.getTime()); - Configuration conf = new Configuration(); - - String fullPath = parentFolder - + org.apache.hadoop.fs.Path.SEPARATOR + fileName; - String defaultPath = fullPath; - URI uri = URI.create(fullPath); - FileSystem fileSystem = FileSystem.get(uri, conf); - - Path hdfPath = new Path(fullPath); - logger.info("Checking whether log file exists. hdfPath=" + fullPath); - int i = 0; - while (fileSystem.exists(hdfPath)) { - i++; - int lastDot = defaultPath.lastIndexOf('.'); - String baseName = defaultPath.substring(0, lastDot); - String extension = defaultPath.substring(lastDot); - fullPath = baseName + "." + i + extension; - hdfPath = new Path(fullPath); - logger.info("Checking whether log file exists. hdfPath=" + fullPath); - } - logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath); - // Create parent folders - createParents(hdfPath, fileSystem); - - // Create the file to write - logger.info("Creating new log file. hdfPath=" + fullPath); - FSDataOutputStream ostream = fileSystem.create(hdfPath); - logWriter = new PrintWriter(ostream); - fileCreateTime = new Date(); - currentFileName = fullPath; - } - return logWriter; - } - - private void createParents(Path pathLogfile, FileSystem fileSystem) - throws Throwable { - logger.info("Creating parent folder for " + pathLogfile); - Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null; - - if (parentPath != null && fileSystem != null - && !fileSystem.exists(parentPath)) { - fileSystem.mkdirs(parentPath); - } - } - - private void closeFileIfNeeded() throws FileNotFoundException, IOException { - if (logWriter == null) { - return; - } - // TODO: Close the file on absolute time. Currently it is implemented as - // relative time - if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) { - logger.info("Closing file. Rolling over. name=" + getName() - + ", fileName=" + currentFileName); - logWriter.flush(); - logWriter.close(); - logWriter = null; - currentFileName = null; - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java index a5a52a0..040a045 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java @@ -23,6 +23,7 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.destination.AuditDestination; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java index 57ac0a0..876fa5b 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java @@ -69,6 +69,8 @@ public class MultiDestAuditProvider extends BaseAuditProvider { public void addAuditProviders(List<AuditProvider> providers) { if (providers != null) { for (AuditProvider provider : providers) { + LOG.info("Adding " + provider.getName() + + " as consumer to MultiDestination " + getName()); addAuditProvider(provider); } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java new file mode 100644 index 0000000..a6f291d --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditAsyncQueue.java @@ -0,0 +1,174 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.LinkedTransferQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditProvider; +import org.apache.ranger.audit.provider.BaseAuditProvider; + +/** + * This is a non-blocking queue with no limit on capacity. + */ +public class AuditAsyncQueue extends BaseAuditProvider implements Runnable { + private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class); + + LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>(); + Thread consumerThread = null; + + static final int MAX_DRAIN = 1000; + static int threadCount = 0; + static final String DEFAULT_NAME = "async"; + + public AuditAsyncQueue() { + setName(DEFAULT_NAME); + } + + public AuditAsyncQueue(AuditProvider consumer) { + super(consumer); + setName(DEFAULT_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. + * audit.model.AuditEventBase) + */ + @Override + public boolean log(AuditEventBase event) { + // Add to the queue and return ASAP + if (queue.size() >= getMaxQueueSize()) { + return false; + } + queue.add(event); + addLifeTimeInLogCount(1); + return true; + } + + @Override + public boolean log(Collection<AuditEventBase> events) { + boolean ret = true; + for (AuditEventBase event : events) { + ret = log(event); + if (!ret) { + break; + } + } + return ret; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#start() + */ + @Override + public void start() { + if (consumer != null) { + consumer.start(); + } + + consumerThread = new Thread(this, this.getClass().getName() + + (threadCount++)); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#stop() + */ + @Override + public void stop() { + setDrain(true); + try { + if (consumerThread != null) { + consumerThread.interrupt(); + } + consumerThread = null; + } catch (Throwable t) { + // ignore any exception + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() + */ + @Override + public boolean isFlushPending() { + if (queue.isEmpty()) { + return consumer.isFlushPending(); + } + return true; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + while (true) { + try { + AuditEventBase event = null; + if (!isDrain()) { + // For Transfer queue take() is blocking + event = queue.take(); + } else { + // For Transfer queue poll() is non blocking + event = queue.poll(); + } + if (event != null) { + Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>(); + eventList.add(event); + queue.drainTo(eventList, MAX_DRAIN - 1); + consumer.log(eventList); + } + } catch (InterruptedException e) { + logger.info( + "Caught exception in consumer thread. Mostly to about loop", + e); + } catch (Throwable t) { + logger.error("Caught error during processing request.", t); + } + if (isDrain() && queue.isEmpty()) { + break; + } + } + try { + // Call stop on the consumer + consumer.stop(); + } catch (Throwable t) { + logger.error("Error while calling stop on consumer.", t); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java new file mode 100644 index 0000000..5e21efc --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditProvider; +import org.apache.ranger.audit.provider.BaseAuditProvider; + +public class AuditBatchQueue extends BaseAuditProvider implements Runnable { + private static final Log logger = LogFactory.getLog(AuditBatchQueue.class); + + private BlockingQueue<AuditEventBase> queue = null; + private Collection<AuditEventBase> localBatchBuffer = new ArrayList<AuditEventBase>(); + + Thread consumerThread = null; + static int threadCount = 0; + + public AuditBatchQueue() { + } + + public AuditBatchQueue(AuditProvider consumer) { + super(consumer); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. + * audit.model.AuditEventBase) + */ + @Override + public boolean log(AuditEventBase event) { + // Add to batchQueue. Block if full + queue.add(event); + addLifeTimeInLogCount(1); + return true; + } + + @Override + public boolean log(Collection<AuditEventBase> events) { + boolean ret = true; + for (AuditEventBase event : events) { + ret = log(event); + if (!ret) { + break; + } + } + return ret; + } + + @Override + public void init(Properties prop, String basePropertyName) { + String propPrefix = "xasecure.audit.batch"; + if (basePropertyName != null) { + propPrefix = basePropertyName; + } + + super.init(prop, propPrefix); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#start() + */ + @Override + synchronized public void start() { + if (consumerThread != null) { + logger.error("Provider is already started. name=" + getName()); + return; + } + logger.info("Creating ArrayBlockingQueue with maxSize=" + + getMaxQueueSize()); + queue = new ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize()); + + // Start the consumer first + consumer.start(); + + // Then the FileSpooler + if (fileSpoolerEnabled) { + fileSpooler.start(); + } + + // Finally the queue listener + consumerThread = new Thread(this, this.getClass().getName() + + (threadCount++)); + consumerThread.setDaemon(true); + consumerThread.start(); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#stop() + */ + @Override + public void stop() { + setDrain(true); + flush(); + try { + if (consumerThread != null) { + consumerThread.interrupt(); + } + consumerThread = null; + } catch (Throwable t) { + // ignore any exception + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete() + */ + @Override + public void waitToComplete() { + int defaultTimeOut = -1; + waitToComplete(defaultTimeOut); + consumer.waitToComplete(defaultTimeOut); + } + + @Override + public void waitToComplete(long timeout) { + setDrain(true); + flush(); + long sleepTime = 1000; + long startTime = System.currentTimeMillis(); + int prevQueueSize = -1; + int staticLoopCount = 0; + while ((queue.size() > 0 || localBatchBuffer.size() > 0)) { + if (prevQueueSize == queue.size()) { + logger.error("Queue size is not changing. " + getName() + + ".size=" + queue.size()); + staticLoopCount++; + if (staticLoopCount > 5) { + logger.error("Aborting writing to consumer. Some logs will be discarded." + + getName() + ".size=" + queue.size()); + } + } else { + staticLoopCount = 0; + } + if (consumerThread != null) { + consumerThread.interrupt(); + } + try { + Thread.sleep(sleepTime); + if (timeout > 0 + && (System.currentTimeMillis() - startTime > timeout)) { + break; + } + } catch (InterruptedException e) { + break; + } + } + consumer.waitToComplete(timeout); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() + */ + @Override + public boolean isFlushPending() { + if (queue.isEmpty()) { + return consumer.isFlushPending(); + } + return true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#flush() + */ + @Override + public void flush() { + if (fileSpoolerEnabled) { + fileSpooler.flush(); + } + consumer.flush(); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + long lastDispatchTime = System.currentTimeMillis(); + boolean isDestActive = true; + while (true) { + // Time to next dispatch + long nextDispatchDuration = lastDispatchTime + - System.currentTimeMillis() + getMaxBatchInterval(); + + boolean isToSpool = false; + boolean fileSpoolDrain = false; + try { + if (fileSpoolerEnabled && fileSpooler.isPending()) { + int percentUsed = (getMaxQueueSize() - queue.size()) * 100 + / getMaxQueueSize(); + long lastAttemptDelta = fileSpooler + .getLastAttemptTimeDelta(); + + fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime; + // If we should even read from queue? + if (!isDrain() && !fileSpoolDrain + && percentUsed < fileSpoolDrainThresholdPercent) { + // Since some files are still under progress and it is + // not in drain mode, lets wait and retry + if (nextDispatchDuration > 0) { + Thread.sleep(nextDispatchDuration); + lastDispatchTime = System.currentTimeMillis(); + } + continue; + } + isToSpool = true; + } + + AuditEventBase event = null; + + if (!isToSpool && !isDrain() && !fileSpoolDrain + && nextDispatchDuration > 0) { + event = queue.poll(nextDispatchDuration, + TimeUnit.MILLISECONDS); + } else { + // For poll() is non blocking + event = queue.poll(); + } + + if (event != null) { + localBatchBuffer.add(event); + if (getMaxBatchSize() >= localBatchBuffer.size()) { + queue.drainTo(localBatchBuffer, getMaxBatchSize() + - localBatchBuffer.size()); + } + } else { + // poll returned due to timeout, so reseting clock + nextDispatchDuration = lastDispatchTime + - System.currentTimeMillis() + + getMaxBatchInterval(); + + lastDispatchTime = System.currentTimeMillis(); + } + } catch (InterruptedException e) { + logger.info( + "Caught exception in consumer thread. Mostly to abort loop", + e); + setDrain(true); + } catch (Throwable t) { + logger.error("Caught error during processing request.", t); + } + + if (localBatchBuffer.size() > 0 && isToSpool) { + // Let spool to the file directly + if (isDestActive) { + logger.info("Switching to file spool. Queue=" + getName() + + ", dest=" + consumer.getName()); + } + isDestActive = false; + // Just before stashing + lastDispatchTime = System.currentTimeMillis(); + fileSpooler.stashLogs(localBatchBuffer); + localBatchBuffer.clear(); + } else if (localBatchBuffer.size() > 0 + && (isDrain() + || localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) { + if (fileSpoolerEnabled && !isDestActive) { + logger.info("Switching to writing to destination. Queue=" + + getName() + ", dest=" + consumer.getName()); + } + // Reset time just before sending the logs + lastDispatchTime = System.currentTimeMillis(); + boolean ret = consumer.log(localBatchBuffer); + if (!ret) { + if (fileSpoolerEnabled) { + logger.info("Switching to file spool. Queue=" + + getName() + ", dest=" + consumer.getName()); + // Transient error. Stash and move on + fileSpooler.stashLogs(localBatchBuffer); + isDestActive = false; + } else { + // We need to drop this event + logFailedEvent(localBatchBuffer, null); + } + } else { + isDestActive = true; + } + localBatchBuffer.clear(); + } + + if (isDrain()) { + if (!queue.isEmpty() || localBatchBuffer.size() > 0) { + logger.info("Queue is not empty. Will retry. queue.size)=" + + queue.size() + ", localBatchBuffer.size()=" + + localBatchBuffer.size()); + } else { + break; + } + } + } + + logger.info("Exiting consumerThread. Queue=" + getName() + ", dest=" + + consumer.getName()); + try { + // Call stop on the consumer + consumer.stop(); + if (fileSpoolerEnabled) { + fileSpooler.stop(); + } + } catch (Throwable t) { + logger.error("Error while calling stop on consumer.", t); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java new file mode 100644 index 0000000..66d1573 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditFileSpool.java @@ -0,0 +1,884 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileFilter; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditProvider; +import org.apache.ranger.audit.provider.MiscUtil; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * This class temporarily stores logs in file system if the destination is + * overloaded or down + */ +public class AuditFileSpool implements Runnable { + private static final Log logger = LogFactory.getLog(AuditFileSpool.class); + + public enum SPOOL_FILE_STATUS { + pending, write_inprogress, read_inprogress, done + } + + public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir"; + public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = "filespool.filename.format"; + public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = "filespool.archive.dir"; + public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = "filespool.archive.max.files"; + public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = "filespool.file.prefix"; + public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec"; + public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename"; + // public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE = + // "filespool.index.done_filename"; + public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms"; + + AuditProvider queueProvider = null; + AuditProvider consumerProvider = null; + + BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>(); + + // Folder and File attributes + File logFolder = null; + String logFileNameFormat = null; + File archiveFolder = null; + String fileNamePrefix = null; + String indexFileName = null; + File indexFile = null; + String indexDoneFileName = null; + File indexDoneFile = null; + int retryDestinationMS = 30 * 1000; // Default 30 seconds + int fileRolloverSec = 24 * 60 * 60; // In seconds + int maxArchiveFiles = 100; + + int errorLogIntervalMS = 30 * 1000; // Every 30 seconds + long lastErrorLogMS = 0; + + List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>(); + + boolean isPending = false; + long lastAttemptTime = 0; + boolean initDone = false; + + PrintWriter logWriter = null; + AuditIndexRecord currentWriterIndexRecord = null; + AuditIndexRecord currentConsumerIndexRecord = null; + + BufferedReader logReader = null; + + Thread destinationThread = null; + + boolean isWriting = true; + boolean isDrain = false; + boolean isDestDown = true; + + private static Gson gson = null; + + public AuditFileSpool(AuditProvider queueProvider, + AuditProvider consumerProvider) { + this.queueProvider = queueProvider; + this.consumerProvider = consumerProvider; + } + + public void init(Properties prop) { + init(prop, null); + } + + public void init(Properties props, String basePropertyName) { + if (initDone) { + logger.error("init() called more than once. queueProvider=" + + queueProvider.getName() + ", consumerProvider=" + + consumerProvider.getName()); + return; + } + String propPrefix = "xasecure.audit.filespool"; + if (basePropertyName != null) { + propPrefix = basePropertyName; + } + + try { + gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + .create(); + + // Initial folder and file properties + String logFolderProp = MiscUtil.getStringProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_LOCAL_DIR); + logFileNameFormat = MiscUtil.getStringProperty(props, + basePropertyName + "." + PROP_FILE_SPOOL_LOCAL_FILE_NAME); + String archiveFolderProp = MiscUtil.getStringProperty(props, + propPrefix + "." + PROP_FILE_SPOOL_ARCHIVE_DIR); + fileNamePrefix = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_FILENAME_PREFIX); + indexFileName = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_INDEX_FILE); + retryDestinationMS = MiscUtil.getIntProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_DEST_RETRY_MS, retryDestinationMS); + fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_FILE_ROLLOVER, fileRolloverSec); + maxArchiveFiles = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles); + + logger.info("retryDestinationMS=" + retryDestinationMS + + ", queueName=" + queueProvider.getName()); + logger.info("fileRolloverSec=" + fileRolloverSec + ", queueName=" + + queueProvider.getName()); + logger.info("maxArchiveFiles=" + maxArchiveFiles + ", queueName=" + + queueProvider.getName()); + + if (logFolderProp == null || logFolderProp.isEmpty()) { + logger.error("Audit spool folder is not configured. Please set " + + propPrefix + + "." + + PROP_FILE_SPOOL_LOCAL_DIR + + ". queueName=" + queueProvider.getName()); + return; + } + logFolder = new File(logFolderProp); + if (!logFolder.isDirectory()) { + logFolder.mkdirs(); + if (!logFolder.isDirectory()) { + logger.error("File Spool folder not found and can't be created. folder=" + + logFolder.getAbsolutePath() + + ", queueName=" + + queueProvider.getName()); + return; + } + } + logger.info("logFolder=" + logFolder + ", queueName=" + + queueProvider.getName()); + + if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { + logFileNameFormat = "spool_" + "%app-type%" + "_" + + "%time:yyyyMMdd-HHmm.ss%.log"; + } + logger.info("logFileNameFormat=" + logFileNameFormat + + ", queueName=" + queueProvider.getName()); + + if (archiveFolderProp == null || archiveFolderProp.isEmpty()) { + archiveFolder = new File(logFolder, "archive"); + } else { + archiveFolder = new File(archiveFolderProp); + } + if (!archiveFolder.isDirectory()) { + archiveFolder.mkdirs(); + if (!archiveFolder.isDirectory()) { + logger.error("File Spool archive folder not found and can't be created. folder=" + + archiveFolder.getAbsolutePath() + + ", queueName=" + + queueProvider.getName()); + return; + } + } + logger.info("archiveFolder=" + archiveFolder + ", queueName=" + + queueProvider.getName()); + + if (indexFileName == null || indexFileName.isEmpty()) { + if (fileNamePrefix == null || fileNamePrefix.isEmpty()) { + fileNamePrefix = queueProvider.getName() + "_" + + consumerProvider.getName(); + } + indexFileName = "index_" + fileNamePrefix + ".json"; + } + + indexFile = new File(logFolder, indexFileName); + if (!indexFile.exists()) { + indexFile.createNewFile(); + } + logger.info("indexFile=" + indexFile + ", queueName=" + + queueProvider.getName()); + + int lastDot = indexFileName.lastIndexOf('.'); + indexDoneFileName = indexFileName.substring(0, lastDot) + + "_closed.json"; + indexDoneFile = new File(logFolder, indexDoneFileName); + if (!indexDoneFile.exists()) { + indexDoneFile.createNewFile(); + } + logger.info("indexDoneFile=" + indexDoneFile + ", queueName=" + + queueProvider.getName()); + + // Load index file + loadIndexFile(); + for (AuditIndexRecord auditIndexRecord : indexRecords) { + if (!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) { + isPending = true; + } + if (auditIndexRecord.status + .equals(SPOOL_FILE_STATUS.write_inprogress)) { + currentWriterIndexRecord = auditIndexRecord; + logger.info("currentWriterIndexRecord=" + + currentWriterIndexRecord.filePath + + ", queueName=" + queueProvider.getName()); + } + if (auditIndexRecord.status + .equals(SPOOL_FILE_STATUS.read_inprogress)) { + indexQueue.add(auditIndexRecord); + } + } + printIndex(); + // One more loop to add the rest of the pending records in reverse + // order + for (int i = 0; i < indexRecords.size(); i++) { + AuditIndexRecord auditIndexRecord = indexRecords.get(i); + if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) { + File consumerFile = new File(auditIndexRecord.filePath); + if (!consumerFile.exists()) { + logger.error("INIT: Consumer file=" + + consumerFile.getPath() + " not found."); + System.exit(1); + } + indexQueue.add(auditIndexRecord); + } + } + + } catch (Throwable t) { + logger.fatal("Error initializing File Spooler. queue=" + + queueProvider.getName(), t); + return; + } + initDone = true; + } + + /** + * Start looking for outstanding logs and update status according. + */ + public void start() { + if (!initDone) { + logger.error("Cannot start Audit File Spooler. Initilization not done yet. queueName=" + + queueProvider.getName()); + return; + } + + logger.info("Starting writerThread, queueName=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + + // Let's start the thread to read + destinationThread = new Thread(this, queueProvider.getName() + + "_destWriter"); + destinationThread.setDaemon(true); + destinationThread.start(); + } + + public void stop() { + if (!initDone) { + logger.error("Cannot stop Audit File Spooler. Initilization not done. queueName=" + + queueProvider.getName()); + return; + } + logger.info("Stop called, queueName=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + + isDrain = true; + flush(); + + PrintWriter out = getOpenLogFileStream(); + if (out != null) { + // If write is still going on, then let's give it enough time to + // complete + for (int i = 0; i < 3; i++) { + if (isWriting) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + continue; + } + try { + logger.info("Closing open file, queueName=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + + out.flush(); + out.close(); + } catch (Throwable t) { + logger.debug("Error closing spool out file.", t); + } + } + } + try { + if (destinationThread != null) { + destinationThread.interrupt(); + } + destinationThread = null; + } catch (Throwable e) { + // ignore + } + } + + public void flush() { + if (!initDone) { + logger.error("Cannot flush Audit File Spooler. Initilization not done. queueName=" + + queueProvider.getName()); + return; + } + PrintWriter out = getOpenLogFileStream(); + if (out != null) { + out.flush(); + } + } + + /** + * If any files are still not processed. Also, if the destination is not + * reachable + * + * @return + */ + public boolean isPending() { + if (!initDone) { + logError("isPending(): File Spooler not initialized. queueName=" + + queueProvider.getName()); + return false; + } + + return isPending; + } + + /** + * Milliseconds from last attempt time + * + * @return + */ + public long getLastAttemptTimeDelta() { + if (lastAttemptTime == 0) { + return 0; + } + return System.currentTimeMillis() - lastAttemptTime; + } + + synchronized public void stashLogs(AuditEventBase event) { + if (isDrain) { + // Stop has been called, so this method shouldn't be called + logger.error("stashLogs() is called after stop is called. event=" + + event); + return; + } + try { + isWriting = true; + PrintWriter logOut = getLogFileStream(); + // Convert event to json + String jsonStr = MiscUtil.stringify(event); + logOut.println(jsonStr); + isPending = true; + } catch (Exception ex) { + logger.error("Error writing to file. event=" + event, ex); + } finally { + isWriting = false; + } + + } + + synchronized public void stashLogs(Collection<AuditEventBase> events) { + for (AuditEventBase event : events) { + stashLogs(event); + } + flush(); + } + + synchronized public void stashLogsString(String event) { + if (isDrain) { + // Stop has been called, so this method shouldn't be called + logger.error("stashLogs() is called after stop is called. event=" + + event); + return; + } + try { + isWriting = true; + PrintWriter logOut = getLogFileStream(); + logOut.println(event); + } catch (Exception ex) { + logger.error("Error writing to file. event=" + event, ex); + } finally { + isWriting = false; + } + + } + + synchronized public void stashLogsString(Collection<String> events) { + for (String event : events) { + stashLogsString(event); + } + flush(); + } + + /** + * This return the current file. If there are not current open output file, + * then it will return null + * + * @return + * @throws Exception + */ + synchronized private PrintWriter getOpenLogFileStream() { + return logWriter; + } + + /** + * @return + * @throws Exception + */ + synchronized private PrintWriter getLogFileStream() throws Exception { + closeFileIfNeeded(); + + // Either there are no open log file or the previous one has been rolled + // over + if (currentWriterIndexRecord == null) { + Date currentTime = new Date(); + // Create a new file + String fileName = MiscUtil.replaceTokens(logFileNameFormat, + currentTime.getTime()); + String newFileName = fileName; + File outLogFile = null; + int i = 0; + while (true) { + outLogFile = new File(logFolder, newFileName); + File archiveLogFile = new File(archiveFolder, newFileName); + if (!outLogFile.exists() && !archiveLogFile.exists()) { + break; + } + i++; + int lastDot = fileName.lastIndexOf('.'); + String baseName = fileName.substring(0, lastDot); + String extension = fileName.substring(lastDot); + newFileName = baseName + "." + i + extension; + } + fileName = newFileName; + logger.info("Creating new file. queueName=" + + queueProvider.getName() + ", fileName=" + fileName); + // Open the file + logWriter = new PrintWriter(new BufferedWriter(new FileWriter( + outLogFile))); + + AuditIndexRecord tmpIndexRecord = new AuditIndexRecord(); + + tmpIndexRecord.id = MiscUtil.generateUniqueId(); + tmpIndexRecord.filePath = outLogFile.getPath(); + tmpIndexRecord.status = SPOOL_FILE_STATUS.write_inprogress; + tmpIndexRecord.fileCreateTime = currentTime; + tmpIndexRecord.lastAttempt = true; + currentWriterIndexRecord = tmpIndexRecord; + indexRecords.add(currentWriterIndexRecord); + saveIndexFile(); + + } else { + if (logWriter == null) { + // This means the process just started. We need to open the file + // in append mode. + logger.info("Opening existing file for append. queueName=" + + queueProvider.getName() + ", fileName=" + + currentWriterIndexRecord.filePath); + logWriter = new PrintWriter(new BufferedWriter(new FileWriter( + currentWriterIndexRecord.filePath, true))); + } + } + return logWriter; + } + + synchronized private void closeFileIfNeeded() throws FileNotFoundException, + IOException { + // Is there file open to write or there are no pending file, then close + // the active file + if (currentWriterIndexRecord != null) { + // Check whether the file needs to rolled + boolean closeFile = false; + if (indexRecords.size() == 1) { + closeFile = true; + logger.info("Closing file. Only one open file. queueName=" + + queueProvider.getName() + ", fileName=" + + currentWriterIndexRecord.filePath); + } else if (System.currentTimeMillis() + - currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) { + closeFile = true; + logger.info("Closing file. Rolling over. queueName=" + + queueProvider.getName() + ", fileName=" + + currentWriterIndexRecord.filePath); + } + if (closeFile) { + // Roll the file + if (logWriter != null) { + logWriter.flush(); + logWriter.close(); + logWriter = null; + } + currentWriterIndexRecord.status = SPOOL_FILE_STATUS.pending; + currentWriterIndexRecord.writeCompleteTime = new Date(); + saveIndexFile(); + logger.info("Adding file to queue. queueName=" + + queueProvider.getName() + ", fileName=" + + currentWriterIndexRecord.filePath); + indexQueue.add(currentWriterIndexRecord); + currentWriterIndexRecord = null; + } + } + } + + /** + * Load the index file + * + * @throws IOException + */ + void loadIndexFile() throws IOException { + logger.info("Loading index file. fileName=" + indexFile.getPath()); + BufferedReader br = new BufferedReader(new FileReader(indexFile)); + indexRecords.clear(); + String line; + while ((line = br.readLine()) != null) { + if (!line.isEmpty() && !line.startsWith("#")) { + AuditIndexRecord record = gson.fromJson(line, + AuditIndexRecord.class); + indexRecords.add(record); + } + } + br.close(); + } + + synchronized void printIndex() { + logger.info("INDEX printIndex() ==== START"); + Iterator<AuditIndexRecord> iter = indexRecords.iterator(); + while (iter.hasNext()) { + AuditIndexRecord record = iter.next(); + logger.info("INDEX=" + record + ", isFileExist=" + + (new File(record.filePath).exists())); + } + logger.info("INDEX printIndex() ==== END"); + } + + synchronized void removeIndexRecord(AuditIndexRecord indexRecord) + throws FileNotFoundException, IOException { + Iterator<AuditIndexRecord> iter = indexRecords.iterator(); + while (iter.hasNext()) { + AuditIndexRecord record = iter.next(); + if (record.id.equals(indexRecord.id)) { + logger.info("Removing file from index. file=" + record.filePath + + ", queueName=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + + iter.remove(); + appendToDoneFile(record); + } + } + saveIndexFile(); + } + + synchronized void saveIndexFile() throws FileNotFoundException, IOException { + PrintWriter out = new PrintWriter(indexFile); + for (AuditIndexRecord auditIndexRecord : indexRecords) { + out.println(gson.toJson(auditIndexRecord)); + } + out.close(); + // printIndex(); + + } + + void appendToDoneFile(AuditIndexRecord indexRecord) + throws FileNotFoundException, IOException { + logger.info("Moving to done file. " + indexRecord.filePath + + ", queueName=" + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + String line = gson.toJson(indexRecord); + PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter( + indexDoneFile, true))); + out.println(line); + out.flush(); + out.close(); + + // Move to archive folder + File logFile = null; + File archiveFile = null; + try { + logFile = new File(indexRecord.filePath); + String fileName = logFile.getName(); + archiveFile = new File(archiveFolder, fileName); + logger.info("Moving logFile " + logFile + " to " + archiveFile); + logFile.renameTo(archiveFile); + } catch (Throwable t) { + logger.error("Error moving log file to archive folder. logFile=" + + logFile + ", archiveFile=" + archiveFile, t); + } + + archiveFile = null; + try { + // Remove old files + File[] logFiles = archiveFolder.listFiles(new FileFilter() { + public boolean accept(File pathname) { + return pathname.getName().toLowerCase().endsWith(".log"); + } + }); + + if (logFiles.length > maxArchiveFiles) { + int filesToDelete = logFiles.length - maxArchiveFiles; + BufferedReader br = new BufferedReader(new FileReader( + indexDoneFile)); + try { + int filesDeletedCount = 0; + while ((line = br.readLine()) != null) { + if (!line.isEmpty() && !line.startsWith("#")) { + AuditIndexRecord record = gson.fromJson(line, + AuditIndexRecord.class); + logFile = new File(record.filePath); + String fileName = logFile.getName(); + archiveFile = new File(archiveFolder, fileName); + if (archiveFile.exists()) { + logger.info("Deleting archive file " + + archiveFile); + boolean ret = archiveFile.delete(); + if (!ret) { + logger.error("Error deleting archive file. archiveFile=" + + archiveFile); + } + filesDeletedCount++; + if (filesDeletedCount >= filesToDelete) { + logger.info("Deleted " + filesDeletedCount + + " files"); + break; + } + } + } + } + } finally { + br.close(); + } + } + } catch (Throwable t) { + logger.error("Error deleting older archive file. archiveFile=" + + archiveFile, t); + } + + } + + void logError(String msg) { + long currTimeMS = System.currentTimeMillis(); + if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) { + logger.error(msg); + lastErrorLogMS = currTimeMS; + } + } + + class AuditIndexRecord { + String id; + String filePath; + int linePosition = 0; + SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress; + Date fileCreateTime; + Date writeCompleteTime; + Date doneCompleteTime; + Date lastSuccessTime; + Date lastFailedTime; + int failedAttemptCount = 0; + boolean lastAttempt = false; + + @Override + public String toString() { + return "AuditIndexRecord [id=" + id + ", filePath=" + filePath + + ", linePosition=" + linePosition + ", status=" + status + + ", fileCreateTime=" + fileCreateTime + + ", writeCompleteTime=" + writeCompleteTime + + ", doneCompleteTime=" + doneCompleteTime + + ", lastSuccessTime=" + lastSuccessTime + + ", lastFailedTime=" + lastFailedTime + + ", failedAttemptCount=" + failedAttemptCount + + ", lastAttempt=" + lastAttempt + "]"; + } + + } + + class AuditFileSpoolAttempt { + Date attemptTime; + String status; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + while (true) { + try { + // Let's pause between each iteration + if (currentConsumerIndexRecord == null) { + currentConsumerIndexRecord = indexQueue.poll( + retryDestinationMS, TimeUnit.MILLISECONDS); + } else { + Thread.sleep(retryDestinationMS); + } + + if (isDrain) { + // Need to exit + break; + } + if (currentConsumerIndexRecord == null) { + closeFileIfNeeded(); + continue; + } + + boolean isRemoveIndex = false; + File consumerFile = new File( + currentConsumerIndexRecord.filePath); + if (!consumerFile.exists()) { + logger.error("Consumer file=" + consumerFile.getPath() + + " not found."); + printIndex(); + isRemoveIndex = true; + } else { + // Let's open the file to write + BufferedReader br = new BufferedReader(new FileReader( + currentConsumerIndexRecord.filePath)); + try { + int startLine = currentConsumerIndexRecord.linePosition; + String line; + int currLine = 0; + boolean isResumed = false; + List<String> lines = new ArrayList<String>(); + while ((line = br.readLine()) != null) { + currLine++; + if (currLine < startLine) { + continue; + } + lines.add(line); + if (lines.size() == queueProvider.getMaxBatchSize()) { + boolean ret = sendEvent(lines, + currentConsumerIndexRecord, currLine); + if (!ret) { + throw new Exception("Destination down"); + } else { + if (!isResumed) { + logger.info("Started writing to destination. file=" + + currentConsumerIndexRecord.filePath + + ", queueName=" + + queueProvider.getName() + + ", consumer=" + + consumerProvider.getName()); + } + } + lines.clear(); + } + } + if (lines.size() > 0) { + boolean ret = sendEvent(lines, + currentConsumerIndexRecord, currLine); + if (!ret) { + throw new Exception("Destination down"); + } else { + if (!isResumed) { + logger.info("Started writing to destination. file=" + + currentConsumerIndexRecord.filePath + + ", queueName=" + + queueProvider.getName() + + ", consumer=" + + consumerProvider.getName()); + } + } + lines.clear(); + } + logger.info("Done reading file. file=" + + currentConsumerIndexRecord.filePath + + ", queueName=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + // The entire file is read + currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done; + currentConsumerIndexRecord.doneCompleteTime = new Date(); + currentConsumerIndexRecord.lastAttempt = true; + + isRemoveIndex = true; + } catch (Exception ex) { + isDestDown = true; + logError("Destination down. queueName=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + lastAttemptTime = System.currentTimeMillis(); + // Update the index file + currentConsumerIndexRecord.lastFailedTime = new Date(); + currentConsumerIndexRecord.failedAttemptCount++; + currentConsumerIndexRecord.lastAttempt = false; + saveIndexFile(); + } finally { + br.close(); + } + } + if (isRemoveIndex) { + // Remove this entry from index + removeIndexRecord(currentConsumerIndexRecord); + currentConsumerIndexRecord = null; + closeFileIfNeeded(); + } + } catch (Throwable t) { + logger.error("Exception in destination writing thread.", t); + } + } + logger.info("Exiting file spooler. provider=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + } + + private boolean sendEvent(List<String> lines, AuditIndexRecord indexRecord, + int currLine) { + boolean ret = true; + try { + ret = consumerProvider.logJSON(lines); + if (!ret) { + // Need to log error after fixed interval + logError("Error sending logs to consumer. provider=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + } else { + // Update index and save + indexRecord.linePosition = currLine; + indexRecord.status = SPOOL_FILE_STATUS.read_inprogress; + indexRecord.lastSuccessTime = new Date(); + indexRecord.lastAttempt = true; + saveIndexFile(); + + if (isDestDown) { + isDestDown = false; + logger.info("Destination up now. " + indexRecord.filePath + + ", queueName=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + } + } + } catch (Throwable t) { + logger.error("Error while sending logs to consumer. provider=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName() + ", log=" + lines, t); + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java new file mode 100644 index 0000000..e102d8b --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditSummaryQueue.java @@ -0,0 +1,255 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.queue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.AuditProvider; +import org.apache.ranger.audit.provider.BaseAuditProvider; +import org.apache.ranger.audit.provider.MiscUtil; + +/** + * This is a non-blocking queue with no limit on capacity. + */ +public class AuditSummaryQueue extends BaseAuditProvider implements Runnable { + private static final Log logger = LogFactory + .getLog(AuditSummaryQueue.class); + + public static final String PROP_SUMMARY_INTERVAL = "summary.interval.ms"; + + LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>(); + Thread consumerThread = null; + + static int threadCount = 0; + static final String DEFAULT_NAME = "summary"; + + private static final int MAX_DRAIN = 100000; + + private int maxSummaryInterval = 5000; + + HashMap<String, AuditSummary> summaryMap = new HashMap<String, AuditSummary>(); + + public AuditSummaryQueue() { + setName(DEFAULT_NAME); + } + + public AuditSummaryQueue(AuditProvider consumer) { + super(consumer); + setName(DEFAULT_NAME); + } + + @Override + public void init(Properties props, String propPrefix) { + super.init(props, propPrefix); + maxSummaryInterval = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_SUMMARY_INTERVAL, maxSummaryInterval); + logger.info("maxSummaryInterval=" + maxSummaryInterval + ", name=" + + getName()); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. + * audit.model.AuditEventBase) + */ + @Override + public boolean log(AuditEventBase event) { + // Add to the queue and return ASAP + if (queue.size() >= getMaxQueueSize()) { + return false; + } + queue.add(event); + addLifeTimeInLogCount(1); + return true; + } + + @Override + public boolean log(Collection<AuditEventBase> events) { + boolean ret = true; + for (AuditEventBase event : events) { + ret = log(event); + if (!ret) { + break; + } + } + return ret; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#start() + */ + @Override + public void start() { + if (consumer != null) { + consumer.start(); + } + + consumerThread = new Thread(this, this.getClass().getName() + + (threadCount++)); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#stop() + */ + @Override + public void stop() { + setDrain(true); + try { + if (consumerThread != null) { + consumerThread.interrupt(); + } + consumerThread = null; + } catch (Throwable t) { + // ignore any exception + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() + */ + @Override + public boolean isFlushPending() { + if (queue.isEmpty()) { + return consumer.isFlushPending(); + } + return true; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + long lastDispatchTime = System.currentTimeMillis(); + + while (true) { + // Time to next dispatch + long nextDispatchDuration = lastDispatchTime + - System.currentTimeMillis() + maxSummaryInterval; + + Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>(); + + try { + AuditEventBase event = null; + if (!isDrain() && nextDispatchDuration > 0) { + event = queue.poll(nextDispatchDuration, + TimeUnit.MILLISECONDS); + } else { + // For poll() is non blocking + event = queue.poll(); + } + + if (event != null) { + eventList.add(event); + queue.drainTo(eventList, MAX_DRAIN - 1); + } else { + // poll returned due to timeout, so reseting clock + nextDispatchDuration = lastDispatchTime + - System.currentTimeMillis() + maxSummaryInterval; + lastDispatchTime = System.currentTimeMillis(); + } + } catch (InterruptedException e) { + logger.info( + "Caught exception in consumer thread. Mostly to about loop", + e); + } catch (Throwable t) { + logger.error("Caught error during processing request.", t); + } + + for (AuditEventBase event : eventList) { + // Add to hash map + String key = event.getEventKey(); + AuditSummary auditSummary = summaryMap.get(key); + if (auditSummary == null) { + auditSummary = new AuditSummary(); + auditSummary.event = event; + auditSummary.startTime = event.getEventTime(); + auditSummary.endTime = event.getEventTime(); + auditSummary.count = 1; + summaryMap.put(key, auditSummary); + } else { + auditSummary.endTime = event.getEventTime(); + auditSummary.count++; + } + } + + if (isDrain() || nextDispatchDuration <= 0) { + for (Map.Entry<String, AuditSummary> entry : summaryMap + .entrySet()) { + AuditSummary auditSummary = entry.getValue(); + auditSummary.event.setEventCount(auditSummary.count); + long timeDiff = auditSummary.endTime.getTime() + - auditSummary.startTime.getTime(); + timeDiff = timeDiff > 0 ? timeDiff : 1; + auditSummary.event.setEventDurationMS(timeDiff); + + // Reset time just before sending the logs + lastDispatchTime = System.currentTimeMillis(); + boolean ret = consumer.log(auditSummary.event); + if (!ret) { + // We need to drop this event + logFailedEvent(auditSummary.event, null); + } + } + summaryMap.clear(); + } + + if (isDrain() && summaryMap.isEmpty() && queue.isEmpty()) { + break; + } + } + + try { + // Call stop on the consumer + consumer.stop(); + } catch (Throwable t) { + logger.error("Error while calling stop on consumer.", t); + } + } + + class AuditSummary { + Date startTime = null; + Date endTime = null; + int count = 0; + AuditEventBase event; + } +}
