RANGER-276 Add support for aggregating audit logs at source Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/236f1ba6 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/236f1ba6 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/236f1ba6
Branch: refs/heads/master Commit: 236f1ba67287725544c5a2d07a688d0894de46c3 Parents: 3adafa4 Author: Don Bosco Durai <[email protected]> Authored: Mon Apr 20 10:23:47 2015 -0700 Committer: Don Bosco Durai <[email protected]> Committed: Mon Apr 20 10:23:47 2015 -0700 ---------------------------------------------------------------------- .../audit/destination/AuditDestination.java | 71 ++ .../audit/destination/FileAuditDestination.java | 231 +++++ .../audit/destination/HDFSAuditDestination.java | 244 +++++ .../ranger/audit/model/AuditEventBase.java | 9 +- .../ranger/audit/model/AuthzAuditEvent.java | 247 +++--- .../ranger/audit/provider/AuditAsyncQueue.java | 167 ---- .../audit/provider/AuditBatchProcessor.java | 327 ------- .../ranger/audit/provider/AuditDestination.java | 70 -- .../ranger/audit/provider/AuditFileSpool.java | 875 ------------------ .../audit/provider/AuditProviderFactory.java | 64 +- .../audit/provider/BaseAuditProvider.java | 1 + .../audit/provider/BufferedAuditProvider.java | 36 +- .../ranger/audit/provider/DbAuditProvider.java | 17 +- .../audit/provider/FileAuditDestination.java | 230 ----- .../audit/provider/HDFSAuditDestination.java | 243 ----- .../audit/provider/Log4jAuditProvider.java | 1 + .../audit/provider/MultiDestAuditProvider.java | 2 + .../ranger/audit/queue/AuditAsyncQueue.java | 174 ++++ .../ranger/audit/queue/AuditBatchQueue.java | 346 ++++++++ .../ranger/audit/queue/AuditFileSpool.java | 884 +++++++++++++++++++ .../ranger/audit/queue/AuditSummaryQueue.java | 255 ++++++ .../apache/ranger/audit/TestAuditProcessor.java | 786 ----------------- .../org/apache/ranger/audit/TestAuditQueue.java | 704 +++++++++++++++ .../org/apache/ranger/audit/TestConsumer.java | 248 ++++++ 24 files changed, 3390 insertions(+), 2842 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java new file mode 100644 index 0000000..25c0220 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/AuditDestination.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.provider.BaseAuditProvider; + +/** + * This class needs to be extended by anyone who wants to build custom + * destination + */ +public abstract class AuditDestination extends BaseAuditProvider { + private static final Log logger = LogFactory.getLog(AuditDestination.class); + + public AuditDestination() { + logger.info("AuditDestination() enter"); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties, + * java.lang.String) + */ + @Override + public void init(Properties prop, String basePropertyName) { + super.init(prop, basePropertyName); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() + */ + @Override + public boolean isFlushPending() { + return false; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#flush() + */ + @Override + public void flush() { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java new file mode 100644 index 0000000..1ccfd5f --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/FileAuditDestination.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.MiscUtil; + +/** + * This class write the logs to local file + */ +public class FileAuditDestination extends AuditDestination { + private static final Log logger = LogFactory + .getLog(FileAuditDestination.class); + + public static final String PROP_FILE_LOCAL_DIR = "dir"; + public static final String PROP_FILE_LOCAL_FILE_NAME_FORMAT = "filename.format"; + public static final String PROP_FILE_FILE_ROLLOVER = "file.rollover.sec"; + + String baseFolder = null; + String fileFormat = null; + int fileRolloverSec = 24 * 60 * 60; // In seconds + private String logFileNameFormat; + + boolean initDone = false; + + private File logFolder; + PrintWriter logWriter = null; + + private Date fileCreateTime = null; + + private String currentFileName; + + private boolean isStopped = false; + + @Override + public void init(Properties prop, String propPrefix) { + super.init(prop, propPrefix); + + // Initialize properties for this class + // Initial folder and file properties + String logFolderProp = MiscUtil.getStringProperty(props, propPrefix + + "." + PROP_FILE_LOCAL_DIR); + logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_FILE_LOCAL_FILE_NAME_FORMAT); + fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_FILE_FILE_ROLLOVER, fileRolloverSec); + + if (logFolderProp == null || logFolderProp.isEmpty()) { + logger.error("File destination folder is not configured. Please set " + + propPrefix + + "." + + PROP_FILE_LOCAL_DIR + + ". name=" + + getName()); + return; + } + logFolder = new File(logFolderProp); + if (!logFolder.isDirectory()) { + logFolder.mkdirs(); + if (!logFolder.isDirectory()) { + logger.error("FileDestination folder not found and can't be created. folder=" + + logFolder.getAbsolutePath() + ", name=" + getName()); + return; + } + } + logger.info("logFolder=" + logFolder + ", name=" + getName()); + + if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { + logFileNameFormat = "%app-type%_ranger_audit.log"; + } + + logger.info("logFileNameFormat=" + logFileNameFormat + ", destName=" + + getName()); + + initDone = true; + } + + @Override + public boolean logJSON(Collection<String> events) { + try { + PrintWriter out = getLogFileStream(); + for (String event : events) { + out.println(event); + } + out.flush(); + } catch (Throwable t) { + logError("Error writing to log file.", t); + return false; + } + return true; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection) + */ + @Override + synchronized public boolean log(Collection<AuditEventBase> events) { + if (isStopped) { + logError("log() called after stop was requested. name=" + getName()); + return false; + } + List<String> jsonList = new ArrayList<String>(); + for (AuditEventBase event : events) { + try { + jsonList.add(MiscUtil.stringify(event)); + } catch (Throwable t) { + logger.error("Error converting to JSON. event=" + event); + } + } + return logJSON(jsonList); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#start() + */ + @Override + public void start() { + // Nothing to do here. We will open the file when the first log request + // comes + } + + @Override + synchronized public void stop() { + if (logWriter != null) { + logWriter.flush(); + logWriter.close(); + logWriter = null; + isStopped = true; + } + } + + // Helper methods in this class + synchronized private PrintWriter getLogFileStream() throws Exception { + closeFileIfNeeded(); + + // Either there are no open log file or the previous one has been rolled + // over + if (logWriter == null) { + Date currentTime = new Date(); + // Create a new file + String fileName = MiscUtil.replaceTokens(logFileNameFormat, + currentTime.getTime()); + File outLogFile = new File(logFolder, fileName); + if (outLogFile.exists()) { + // Let's try to get the next available file + int i = 0; + while (true) { + i++; + int lastDot = fileName.lastIndexOf('.'); + String baseName = fileName.substring(0, lastDot); + String extension = fileName.substring(lastDot); + String newFileName = baseName + "." + i + extension; + File newLogFile = new File(logFolder, newFileName); + if (!newLogFile.exists()) { + // Move the file + if (!outLogFile.renameTo(newLogFile)) { + logger.error("Error renameing file. " + outLogFile + + " to " + newLogFile); + } + break; + } + } + } + if (!outLogFile.exists()) { + logger.info("Creating new file. destName=" + getName() + + ", fileName=" + fileName); + // Open the file + logWriter = new PrintWriter(new BufferedWriter(new FileWriter( + outLogFile))); + } else { + logWriter = new PrintWriter(new BufferedWriter(new FileWriter( + outLogFile, true))); + } + fileCreateTime = new Date(); + currentFileName = outLogFile.getPath(); + } + return logWriter; + } + + private void closeFileIfNeeded() throws FileNotFoundException, IOException { + if (logWriter == null) { + return; + } + if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) { + logger.info("Closing file. Rolling over. name=" + getName() + + ", fileName=" + currentFileName); + logWriter.flush(); + logWriter.close(); + logWriter = null; + currentFileName = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java new file mode 100644 index 0000000..706eb8e --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java @@ -0,0 +1,244 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.destination; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.PrintWriter; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.MiscUtil; + +/** + * This class write the logs to local file + */ +public class HDFSAuditDestination extends AuditDestination { + private static final Log logger = LogFactory + .getLog(HDFSAuditDestination.class); + + public static final String PROP_HDFS_DIR = "dir"; + public static final String PROP_HDFS_SUBDIR = "subdir"; + public static final String PROP_HDFS_FILE_NAME_FORMAT = "filename.format"; + public static final String PROP_HDFS_ROLLOVER = "file.rollover.sec"; + + String baseFolder = null; + String fileFormat = null; + int fileRolloverSec = 24 * 60 * 60; // In seconds + private String logFileNameFormat; + + boolean initDone = false; + + private String logFolder; + PrintWriter logWriter = null; + + private Date fileCreateTime = null; + + private String currentFileName; + + private boolean isStopped = false; + + @Override + public void init(Properties prop, String propPrefix) { + super.init(prop, propPrefix); + + // Initialize properties for this class + // Initial folder and file properties + String logFolderProp = MiscUtil.getStringProperty(props, propPrefix + + "." + PROP_HDFS_DIR); + String logSubFolder = MiscUtil.getStringProperty(props, propPrefix + + "." + PROP_HDFS_SUBDIR); + if (logSubFolder == null || logSubFolder.isEmpty()) { + logSubFolder = "%app-type%/%time:yyyyMMdd%"; + } + + logFileNameFormat = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_HDFS_FILE_NAME_FORMAT); + fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_HDFS_ROLLOVER, fileRolloverSec); + + if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { + logFileNameFormat = "%app-type%_ranger_audit_%hostname%" + ".log"; + } + + if (logFolderProp == null || logFolderProp.isEmpty()) { + logger.fatal("File destination folder is not configured. Please set " + + propPrefix + "." + PROP_HDFS_DIR + ". name=" + getName()); + return; + } + + logFolder = logFolderProp + "/" + logSubFolder; + logger.info("logFolder=" + logFolder + ", destName=" + getName()); + logger.info("logFileNameFormat=" + logFileNameFormat + ", destName=" + + getName()); + + initDone = true; + } + + @Override + public boolean logJSON(Collection<String> events) { + try { + PrintWriter out = getLogFileStream(); + for (String event : events) { + out.println(event); + } + out.flush(); + } catch (Throwable t) { + logError("Error writing to log file.", t); + return false; + } + return true; + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(java.util.Collection) + */ + @Override + synchronized public boolean log(Collection<AuditEventBase> events) { + if (isStopped) { + logError("log() called after stop was requested. name=" + getName()); + return false; + } + List<String> jsonList = new ArrayList<String>(); + for (AuditEventBase event : events) { + try { + jsonList.add(MiscUtil.stringify(event)); + } catch (Throwable t) { + logger.error("Error converting to JSON. event=" + event); + } + } + return logJSON(jsonList); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#start() + */ + @Override + public void start() { + // Nothing to do here. We will open the file when the first log request + // comes + } + + @Override + synchronized public void stop() { + try { + if (logWriter != null) { + logWriter.flush(); + logWriter.close(); + logWriter = null; + isStopped = true; + } + } catch (Throwable t) { + logger.error("Error closing HDFS file.", t); + } + } + + // Helper methods in this class + synchronized private PrintWriter getLogFileStream() throws Throwable { + closeFileIfNeeded(); + + // Either there are no open log file or the previous one has been rolled + // over + if (logWriter == null) { + Date currentTime = new Date(); + // Create a new file + String fileName = MiscUtil.replaceTokens(logFileNameFormat, + currentTime.getTime()); + String parentFolder = MiscUtil.replaceTokens(logFolder, + currentTime.getTime()); + Configuration conf = new Configuration(); + + String fullPath = parentFolder + + org.apache.hadoop.fs.Path.SEPARATOR + fileName; + String defaultPath = fullPath; + URI uri = URI.create(fullPath); + FileSystem fileSystem = FileSystem.get(uri, conf); + + Path hdfPath = new Path(fullPath); + logger.info("Checking whether log file exists. hdfPath=" + fullPath); + int i = 0; + while (fileSystem.exists(hdfPath)) { + i++; + int lastDot = defaultPath.lastIndexOf('.'); + String baseName = defaultPath.substring(0, lastDot); + String extension = defaultPath.substring(lastDot); + fullPath = baseName + "." + i + extension; + hdfPath = new Path(fullPath); + logger.info("Checking whether log file exists. hdfPath=" + fullPath); + } + logger.info("Log file doesn't exists. Will create and use it. hdfPath=" + fullPath); + // Create parent folders + createParents(hdfPath, fileSystem); + + // Create the file to write + logger.info("Creating new log file. hdfPath=" + fullPath); + FSDataOutputStream ostream = fileSystem.create(hdfPath); + logWriter = new PrintWriter(ostream); + fileCreateTime = new Date(); + currentFileName = fullPath; + } + return logWriter; + } + + private void createParents(Path pathLogfile, FileSystem fileSystem) + throws Throwable { + logger.info("Creating parent folder for " + pathLogfile); + Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null; + + if (parentPath != null && fileSystem != null + && !fileSystem.exists(parentPath)) { + fileSystem.mkdirs(parentPath); + } + } + + private void closeFileIfNeeded() throws FileNotFoundException, IOException { + if (logWriter == null) { + return; + } + // TODO: Close the file on absolute time. Currently it is implemented as + // relative time + if (System.currentTimeMillis() - fileCreateTime.getTime() > fileRolloverSec * 1000) { + logger.info("Closing file. Rolling over. name=" + getName() + + ", fileName=" + currentFileName); + logWriter.flush(); + logWriter.close(); + logWriter = null; + currentFileName = null; + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java index a44e047..39a2578 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java @@ -19,6 +19,8 @@ package org.apache.ranger.audit.model; +import java.util.Date; + import org.apache.ranger.audit.dao.DaoManager; public abstract class AuditEventBase { @@ -27,7 +29,12 @@ public abstract class AuditEventBase { } public abstract void persist(DaoManager daoManager); - + + public abstract String getEventKey(); + public abstract Date getEventTime (); + public abstract void setEventCount(long frequencyCount); + public abstract void setEventDurationMS(long frequencyDurationMS); + protected String trim(String str, int len) { String ret = str; if (str != null) { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java index af89f60..d648de3 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java @@ -24,87 +24,87 @@ import java.util.Date; import org.apache.ranger.audit.dao.DaoManager; import org.apache.ranger.audit.entity.AuthzAuditEventDbObj; -import com.google.gson.Gson; import com.google.gson.annotations.SerializedName; - public class AuthzAuditEvent extends AuditEventBase { protected static String FIELD_SEPARATOR = ";"; - protected static final int MAX_ACTION_FIELD_SIZE = 1800 ; - protected static final int MAX_REQUEST_DATA_FIELD_SIZE = 1800 ; + protected static final int MAX_ACTION_FIELD_SIZE = 1800; + protected static final int MAX_REQUEST_DATA_FIELD_SIZE = 1800; @SerializedName("repoType") - protected int repositoryType = 0; + protected int repositoryType = 0; @SerializedName("repo") protected String repositoryName = null; @SerializedName("reqUser") - protected String user = null; + protected String user = null; @SerializedName("evtTime") - protected Date eventTime = new Date(); + protected Date eventTime = new Date(); @SerializedName("access") - protected String accessType = null; + protected String accessType = null; @SerializedName("resource") - protected String resourcePath = null; + protected String resourcePath = null; @SerializedName("resType") - protected String resourceType = null; + protected String resourceType = null; @SerializedName("action") - protected String action = null; + protected String action = null; @SerializedName("result") - protected short accessResult = 0; // 0 - DENIED; 1 - ALLOWED; HTTP return code + protected short accessResult = 0; // 0 - DENIED; 1 - ALLOWED; HTTP return + // code @SerializedName("agent") - protected String agentId = null; + protected String agentId = null; @SerializedName("policy") - protected long policyId = 0; + protected long policyId = 0; @SerializedName("reason") - protected String resultReason = null; + protected String resultReason = null; @SerializedName("enforcer") - protected String aclEnforcer = null; + protected String aclEnforcer = null; @SerializedName("sess") - protected String sessionId = null; + protected String sessionId = null; @SerializedName("cliType") - protected String clientType = null; + protected String clientType = null; @SerializedName("cliIP") - protected String clientIP = null; + protected String clientIP = null; @SerializedName("reqData") - protected String requestData = null; + protected String requestData = null; @SerializedName("agentHost") - protected String agentHostname = null; + protected String agentHostname = null; @SerializedName("logType") - protected String logType = null; + protected String logType = null; @SerializedName("id") - protected String eventId = null; + protected String eventId = null; /** - * This to ensure order within a session. Order not guaranteed across processes and hosts + * This to ensure order within a session. Order not guaranteed across + * processes and hosts */ @SerializedName("seq_num") protected long seqNum = 0; - @SerializedName("freq_count") - protected long frequencyCount = 1; + @SerializedName("event_count") + protected long eventCount = 1; - @SerializedName("freq_dur_ms") - protected long frequencyDurationMS = 0; + @SerializedName("event_dur_ms") + protected long eventDurationMS = 0; public AuthzAuditEvent() { super(); @@ -112,40 +112,29 @@ public class AuthzAuditEvent extends AuditEventBase { this.repositoryType = 0; } - public AuthzAuditEvent(int repositoryType, - String repositoryName, - String user, - Date eventTime, - String accessType, - String resourcePath, - String resourceType, - String action, - short accessResult, - String agentId, - long policyId, - String resultReason, - String aclEnforcer, - String sessionId, - String clientType, - String clientIP, - String requestData) { + public AuthzAuditEvent(int repositoryType, String repositoryName, + String user, Date eventTime, String accessType, + String resourcePath, String resourceType, String action, + short accessResult, String agentId, long policyId, + String resultReason, String aclEnforcer, String sessionId, + String clientType, String clientIP, String requestData) { this.repositoryType = repositoryType; this.repositoryName = repositoryName; - this.user = user; - this.eventTime = eventTime; - this.accessType = accessType; - this.resourcePath = resourcePath; - this.resourceType = resourceType; - this.action = action; - this.accessResult = accessResult; - this.agentId = agentId; - this.policyId = policyId; - this.resultReason = resultReason; - this.aclEnforcer = aclEnforcer; - this.sessionId = sessionId; - this.clientType = clientType; - this.clientIP = clientIP; - this.requestData = requestData; + this.user = user; + this.eventTime = eventTime; + this.accessType = accessType; + this.resourcePath = resourcePath; + this.resourceType = resourceType; + this.action = action; + this.accessResult = accessResult; + this.agentId = agentId; + this.policyId = policyId; + this.resultReason = resultReason; + this.aclEnforcer = aclEnforcer; + this.sessionId = sessionId; + this.clientType = clientType; + this.clientIP = clientIP; + this.requestData = requestData; } /** @@ -156,7 +145,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param repositoryType the repositoryType to set + * @param repositoryType + * the repositoryType to set */ public void setRepositoryType(int repositoryType) { this.repositoryType = repositoryType; @@ -170,7 +160,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param repositoryName the repositoryName to set + * @param repositoryName + * the repositoryName to set */ public void setRepositoryName(String repositoryName) { this.repositoryName = repositoryName; @@ -184,7 +175,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param user the user to set + * @param user + * the user to set */ public void setUser(String user) { this.user = user; @@ -198,7 +190,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param timeStamp the timeStamp to set + * @param timeStamp + * the timeStamp to set */ public void setEventTime(Date eventTime) { this.eventTime = eventTime; @@ -212,7 +205,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param accessType the accessType to set + * @param accessType + * the accessType to set */ public void setAccessType(String accessType) { this.accessType = accessType; @@ -226,7 +220,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param resourcePath the resourcePath to set + * @param resourcePath + * the resourcePath to set */ public void setResourcePath(String resourcePath) { this.resourcePath = resourcePath; @@ -240,7 +235,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param resourceType the resourceType to set + * @param resourceType + * the resourceType to set */ public void setResourceType(String resourceType) { this.resourceType = resourceType; @@ -250,11 +246,12 @@ public class AuthzAuditEvent extends AuditEventBase { * @return the action */ public String getAction() { - return trim(action, MAX_ACTION_FIELD_SIZE) ; + return trim(action, MAX_ACTION_FIELD_SIZE); } /** - * @param action the action to set + * @param action + * the action to set */ public void setAction(String action) { this.action = action; @@ -268,7 +265,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param accessResult the accessResult to set + * @param accessResult + * the accessResult to set */ public void setAccessResult(short accessResult) { this.accessResult = accessResult; @@ -282,7 +280,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param agentId the agentId to set + * @param agentId + * the agentId to set */ public void setAgentId(String agentId) { this.agentId = agentId; @@ -296,7 +295,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param policyId the policyId to set + * @param policyId + * the policyId to set */ public void setPolicyId(long policyId) { this.policyId = policyId; @@ -310,7 +310,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param resultReason the resultReason to set + * @param resultReason + * the resultReason to set */ public void setResultReason(String resultReason) { this.resultReason = resultReason; @@ -324,7 +325,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param aclEnforcer the aclEnforcer to set + * @param aclEnforcer + * the aclEnforcer to set */ public void setAclEnforcer(String aclEnforcer) { this.aclEnforcer = aclEnforcer; @@ -338,7 +340,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param sessionId the sessionId to set + * @param sessionId + * the sessionId to set */ public void setSessionId(String sessionId) { this.sessionId = sessionId; @@ -352,7 +355,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param clientType the clientType to set + * @param clientType + * the clientType to set */ public void setClientType(String clientType) { this.clientType = clientType; @@ -366,7 +370,8 @@ public class AuthzAuditEvent extends AuditEventBase { } /** - * @param clientIP the clientIP to set + * @param clientIP + * the clientIP to set */ public void setClientIP(String clientIP) { this.clientIP = clientIP; @@ -376,11 +381,12 @@ public class AuthzAuditEvent extends AuditEventBase { * @return the requestData */ public String getRequestData() { - return trim(requestData, MAX_REQUEST_DATA_FIELD_SIZE) ; + return trim(requestData, MAX_REQUEST_DATA_FIELD_SIZE); } /** - * @param requestData the requestData to set + * @param requestData + * the requestData to set */ public void setRequestData(String requestData) { this.requestData = requestData; @@ -410,8 +416,6 @@ public class AuthzAuditEvent extends AuditEventBase { this.eventId = eventId; } - - public long getSeqNum() { return seqNum; } @@ -420,20 +424,28 @@ public class AuthzAuditEvent extends AuditEventBase { this.seqNum = seqNum; } - public long getFrequencyCount() { - return frequencyCount; + public long getEventCount() { + return eventCount; } - public void setFrequencyCount(long frequencyCount) { - this.frequencyCount = frequencyCount; + public void setEventCount(long frequencyCount) { + this.eventCount = frequencyCount; } - public long getFrequencyDurationMS() { - return frequencyDurationMS; + public long getEventDurationMS() { + return eventDurationMS; } - public void setFrequencyDurationMS(long frequencyDurationMS) { - this.frequencyDurationMS = frequencyDurationMS; + public void setEventDurationMS(long frequencyDurationMS) { + this.eventDurationMS = frequencyDurationMS; + } + + @Override + public String getEventKey() { + String key = user + "^" + accessType + "^" + resourcePath + "^" + + resourceType + "^" + action + "^" + accessResult + "^" + + sessionId + "^" + clientIP; + return key; } @Override @@ -448,35 +460,42 @@ public class AuthzAuditEvent extends AuditEventBase { } protected StringBuilder toString(StringBuilder sb) { - sb.append("repositoryType=").append(repositoryType).append(FIELD_SEPARATOR) - .append("repositoryName=").append(repositoryName).append(FIELD_SEPARATOR) - .append("user=").append(user).append(FIELD_SEPARATOR) - .append("eventTime=").append(eventTime).append(FIELD_SEPARATOR) - .append("accessType=").append(accessType).append(FIELD_SEPARATOR) - .append("resourcePath=").append(resourcePath).append(FIELD_SEPARATOR) - .append("resourceType=").append(resourceType).append(FIELD_SEPARATOR) - .append("action=").append(action).append(FIELD_SEPARATOR) - .append("accessResult=").append(accessResult).append(FIELD_SEPARATOR) - .append("agentId=").append(agentId).append(FIELD_SEPARATOR) - .append("policyId=").append(policyId).append(FIELD_SEPARATOR) - .append("resultReason=").append(resultReason).append(FIELD_SEPARATOR) - .append("aclEnforcer=").append(aclEnforcer).append(FIELD_SEPARATOR) - .append("sessionId=").append(sessionId).append(FIELD_SEPARATOR) - .append("clientType=").append(clientType).append(FIELD_SEPARATOR) - .append("clientIP=").append(clientIP).append(FIELD_SEPARATOR) - .append("requestData=").append(requestData).append(FIELD_SEPARATOR) - .append("agentHostname=").append(agentHostname).append(FIELD_SEPARATOR) - .append("logType=").append(logType).append(FIELD_SEPARATOR) - .append("eventId=").append(eventId).append(FIELD_SEPARATOR) - .append("seq_num=").append(seqNum).append(FIELD_SEPARATOR) - .append("freq_count=").append(frequencyCount).append(FIELD_SEPARATOR) - .append("freq_dur_ms=").append(frequencyDurationMS).append(FIELD_SEPARATOR) - ; + sb.append("repositoryType=").append(repositoryType) + .append(FIELD_SEPARATOR).append("repositoryName=") + .append(repositoryName).append(FIELD_SEPARATOR).append("user=") + .append(user).append(FIELD_SEPARATOR).append("eventTime=") + .append(eventTime).append(FIELD_SEPARATOR) + .append("accessType=").append(accessType) + .append(FIELD_SEPARATOR).append("resourcePath=") + .append(resourcePath).append(FIELD_SEPARATOR) + .append("resourceType=").append(resourceType) + .append(FIELD_SEPARATOR).append("action=").append(action) + .append(FIELD_SEPARATOR).append("accessResult=") + .append(accessResult).append(FIELD_SEPARATOR) + .append("agentId=").append(agentId).append(FIELD_SEPARATOR) + .append("policyId=").append(policyId).append(FIELD_SEPARATOR) + .append("resultReason=").append(resultReason) + .append(FIELD_SEPARATOR).append("aclEnforcer=") + .append(aclEnforcer).append(FIELD_SEPARATOR) + .append("sessionId=").append(sessionId).append(FIELD_SEPARATOR) + .append("clientType=").append(clientType) + .append(FIELD_SEPARATOR).append("clientIP=").append(clientIP) + .append(FIELD_SEPARATOR).append("requestData=") + .append(requestData).append(FIELD_SEPARATOR) + .append("agentHostname=").append(agentHostname) + .append(FIELD_SEPARATOR).append("logType=").append(logType) + .append(FIELD_SEPARATOR).append("eventId=").append(eventId) + .append(FIELD_SEPARATOR).append("seq_num=").append(seqNum) + .append(FIELD_SEPARATOR).append("event_count=") + .append(eventCount).append(FIELD_SEPARATOR) + .append("event_dur_ms=").append(eventDurationMS) + .append(FIELD_SEPARATOR); return sb; } @Override public void persist(DaoManager daoManager) { - daoManager.getAuthzAuditEventDao().create(new AuthzAuditEventDbObj(this)); + daoManager.getAuthzAuditEventDao().create( + new AuthzAuditEventDbObj(this)); } } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java deleted file mode 100644 index 5553bcc..0000000 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.audit.provider; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.concurrent.LinkedTransferQueue; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.audit.model.AuditEventBase; - -/** - * This is a non-blocking queue with no limit on capacity. - */ -public class AuditAsyncQueue extends BaseAuditProvider implements Runnable { - private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class); - - LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>(); - Thread consumerThread = null; - - static int threadCount = 0; - static final String DEFAULT_NAME = "async"; - - public AuditAsyncQueue() { - setName(DEFAULT_NAME); - } - - public AuditAsyncQueue(AuditProvider consumer) { - super(consumer); - setName(DEFAULT_NAME); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. - * audit.model.AuditEventBase) - */ - @Override - public boolean log(AuditEventBase event) { - // Add to the queue and return ASAP - if (queue.size() >= getMaxQueueSize()) { - return false; - } - queue.add(event); - addLifeTimeInLogCount(1); - return true; - } - - @Override - public boolean log(Collection<AuditEventBase> events) { - for (AuditEventBase event : events) { - log(event); - } - return true; - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#start() - */ - @Override - public void start() { - if(consumer != null) { - consumer.start(); - } - - consumerThread = new Thread(this, this.getClass().getName() - + (threadCount++)); - consumerThread.setDaemon(true); - consumerThread.start(); - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#stop() - */ - @Override - public void stop() { - setDrain(true); - try { - consumerThread.interrupt(); - } catch (Throwable t) { - // ignore any exception - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() - */ - @Override - public boolean isFlushPending() { - if (queue.isEmpty()) { - return consumer.isFlushPending(); - } - return true; - } - - /* - * (non-Javadoc) - * - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - while (true) { - try { - AuditEventBase event = null; - if (!isDrain()) { - // For Transfer queue take() is blocking - event = queue.take(); - } else { - // For Transfer queue poll() is non blocking - event = queue.poll(); - } - if (event != null) { - Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>(); - eventList.add(event); - // TODO: Put a limit. Hard coding to 1000 (use batch size - // property) - queue.drainTo(eventList, 1000 - 1); - consumer.log(eventList); - eventList.clear(); - } - } catch (InterruptedException e) { - logger.info( - "Caught exception in consumer thread. Mostly to about loop", - e); - } catch (Throwable t) { - logger.error("Caught error during processing request.", t); - } - if (isDrain() && queue.isEmpty()) { - break; - } - } - try { - // Call stop on the consumer - consumer.stop(); - } catch (Throwable t) { - logger.error("Error while calling stop on consumer.", t); - } - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java deleted file mode 100644 index 58d122a..0000000 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java +++ /dev/null @@ -1,327 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.audit.provider; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Properties; -import java.util.concurrent.ArrayBlockingQueue; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.audit.model.AuditEventBase; - -public class AuditBatchProcessor extends BaseAuditProvider implements Runnable { - private static final Log logger = LogFactory - .getLog(AuditBatchProcessor.class); - - private BlockingQueue<AuditEventBase> queue = null; - private Collection<AuditEventBase> localBatchBuffer = new ArrayList<AuditEventBase>(); - - Thread consumerThread = null; - static int threadCount = 0; - - public AuditBatchProcessor() { - } - - public AuditBatchProcessor(AuditProvider consumer) { - super(consumer); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. - * audit.model.AuditEventBase) - */ - @Override - public boolean log(AuditEventBase event) { - // Add to batchQueue. Block if full - queue.add(event); - addLifeTimeInLogCount(1); - return true; - } - - @Override - public boolean log(Collection<AuditEventBase> events) { - for (AuditEventBase event : events) { - log(event); - } - return true; - } - - @Override - public void init(Properties prop, String basePropertyName) { - String propPrefix = "xasecure.audit.batch"; - if (basePropertyName != null) { - propPrefix = basePropertyName; - } - - super.init(prop, propPrefix); - - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#start() - */ - @Override - synchronized public void start() { - if (consumerThread != null) { - logger.error("Provider is already started. name=" + getName()); - return; - } - logger.info("Creating ArrayBlockingQueue with maxSize=" - + getMaxQueueSize()); - queue = new ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize()); - - // Start the consumer first - consumer.start(); - - // Then the FileSpooler - if (fileSpoolerEnabled) { - fileSpooler.start(); - } - - // Finally the queue listener - consumerThread = new Thread(this, this.getClass().getName() - + (threadCount++)); - consumerThread.setDaemon(true); - consumerThread.start(); - - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#stop() - */ - @Override - public void stop() { - setDrain(true); - flush(); - try { - consumerThread.interrupt(); - } catch (Throwable t) { - // ignore any exception - } - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete() - */ - @Override - public void waitToComplete() { - int defaultTimeOut = -1; - waitToComplete(defaultTimeOut); - consumer.waitToComplete(defaultTimeOut); - } - - @Override - public void waitToComplete(long timeout) { - setDrain(true); - flush(); - long sleepTime = 1000; - long startTime = System.currentTimeMillis(); - int prevQueueSize = -1; - int staticLoopCount = 0; - while ((queue.size() > 0 || localBatchBuffer.size() > 0)) { - if (prevQueueSize == queue.size()) { - logger.error("Queue size is not changing. " + getName() - + ".size=" + queue.size()); - staticLoopCount++; - if (staticLoopCount > 5) { - logger.error("Aborting writing to consumer. Some logs will be discarded." - + getName() + ".size=" + queue.size()); - } - } else { - staticLoopCount = 0; - } - consumerThread.interrupt(); - try { - Thread.sleep(sleepTime); - if (timeout > 0 - && (System.currentTimeMillis() - startTime > timeout)) { - break; - } - } catch (InterruptedException e) { - break; - } - } - consumer.waitToComplete(timeout); - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() - */ - @Override - public boolean isFlushPending() { - if (queue.isEmpty()) { - return consumer.isFlushPending(); - } - return true; - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#flush() - */ - @Override - public void flush() { - if (fileSpoolerEnabled) { - fileSpooler.flush(); - } - consumer.flush(); - } - - /* - * (non-Javadoc) - * - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - long lastDispatchTime = System.currentTimeMillis(); - boolean isDestActive = true; - while (true) { - // Time to next dispatch - long nextDispatchDuration = lastDispatchTime - - System.currentTimeMillis() + getMaxBatchInterval(); - - boolean isToSpool = false; - boolean fileSpoolDrain = false; - try { - if (fileSpoolerEnabled && fileSpooler.isPending()) { - int percentUsed = (getMaxQueueSize() - queue.size()) * 100 - / getMaxQueueSize(); - long lastAttemptDelta = fileSpooler - .getLastAttemptTimeDelta(); - - fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime; - // If we should even read from queue? - if (!isDrain() && !fileSpoolDrain - && percentUsed < fileSpoolDrainThresholdPercent) { - // Since some files are still under progress and it is - // not in drain mode, lets wait and retry - if (nextDispatchDuration > 0) { - Thread.sleep(nextDispatchDuration); - } - continue; - } - isToSpool = true; - } - - AuditEventBase event = null; - - if (!isToSpool && !isDrain() && !fileSpoolDrain - && nextDispatchDuration > 0) { - event = queue.poll(nextDispatchDuration, - TimeUnit.MILLISECONDS); - - } else { - // For poll() is non blocking - event = queue.poll(); - } - if (event != null) { - localBatchBuffer.add(event); - if (getMaxBatchSize() >= localBatchBuffer.size()) { - queue.drainTo(localBatchBuffer, getMaxBatchSize() - - localBatchBuffer.size()); - } - } - } catch (InterruptedException e) { - logger.info( - "Caught exception in consumer thread. Mostly to abort loop", - e); - } catch (Throwable t) { - logger.error("Caught error during processing request.", t); - } - - if (localBatchBuffer.size() > 0 && isToSpool) { - // Let spool to the file directly - if (isDestActive) { - logger.info("Switching to file spool. Queue=" + getName() - + ", dest=" + consumer.getName()); - } - isDestActive = false; - fileSpooler.stashLogs(localBatchBuffer); - localBatchBuffer.clear(); - // Reset all variables - lastDispatchTime = System.currentTimeMillis(); - } else if (localBatchBuffer.size() > 0 - && (isDrain() - || localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) { - if (fileSpoolerEnabled && !isDestActive) { - logger.info("Switching to writing to destination. Queue=" - + getName() + ", dest=" + consumer.getName()); - } - boolean ret = consumer.log(localBatchBuffer); - if (!ret) { - if (fileSpoolerEnabled) { - logger.info("Switching to file spool. Queue=" - + getName() + ", dest=" + consumer.getName()); - // Transient error. Stash and move on - fileSpooler.stashLogs(localBatchBuffer); - isDestActive = false; - } else { - // We need to drop this event - logFailedEvent(localBatchBuffer, null); - } - } else { - isDestActive = true; - } - localBatchBuffer.clear(); - // Reset all variables - lastDispatchTime = System.currentTimeMillis(); - } - - if (isDrain()) { - if (!queue.isEmpty() || localBatchBuffer.size() > 0) { - logger.info("Queue is not empty. Will retry. queue.size)=" - + queue.size() + ", localBatchBuffer.size()=" - + localBatchBuffer.size()); - } else { - break; - } - } - } - - logger.info("Exiting consumerThread. Queue=" + getName() + ", dest=" - + consumer.getName()); - try { - // Call stop on the consumer - consumer.stop(); - if (fileSpoolerEnabled) { - fileSpooler.stop(); - } - } catch (Throwable t) { - logger.error("Error while calling stop on consumer.", t); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java deleted file mode 100644 index 11c32ca..0000000 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java +++ /dev/null @@ -1,70 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.audit.provider; - -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; - -/** - * This class needs to be extended by anyone who wants to build custom - * destination - */ -public abstract class AuditDestination extends BaseAuditProvider { - private static final Log logger = LogFactory.getLog(AuditDestination.class); - - public AuditDestination() { - logger.info("AuditDestination() enter"); - } - - /* - * (non-Javadoc) - * - * @see - * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties, - * java.lang.String) - */ - @Override - public void init(Properties prop, String basePropertyName) { - super.init(prop, basePropertyName); - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() - */ - @Override - public boolean isFlushPending() { - return false; - } - - /* - * (non-Javadoc) - * - * @see org.apache.ranger.audit.provider.AuditProvider#flush() - */ - @Override - public void flush() { - - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java deleted file mode 100644 index 8b006de..0000000 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java +++ /dev/null @@ -1,875 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -package org.apache.ranger.audit.provider; - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileFilter; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.FileWriter; -import java.io.IOException; -import java.io.PrintWriter; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Date; -import java.util.Iterator; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedTransferQueue; -import java.util.concurrent.TimeUnit; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.ranger.audit.model.AuditEventBase; - -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; - -/** - * This class temporarily stores logs in file system if the destination is - * overloaded or down - */ -public class AuditFileSpool implements Runnable { - private static final Log logger = LogFactory.getLog(AuditFileSpool.class); - - public enum SPOOL_FILE_STATUS { - pending, write_inprogress, read_inprogress, done - } - - public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir"; - public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = "filespool.filename.format"; - public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = "filespool.archive.dir"; - public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = "filespool.archive.max.files"; - public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = "filespool.file.prefix"; - public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec"; - public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename"; - // public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE = - // "filespool.index.done_filename"; - public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms"; - - AuditProvider queueProvider = null; - AuditProvider consumerProvider = null; - - BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>(); - - // Folder and File attributes - File logFolder = null; - String logFileNameFormat = null; - File archiveFolder = null; - String fileNamePrefix = null; - String indexFileName = null; - File indexFile = null; - String indexDoneFileName = null; - File indexDoneFile = null; - int retryDestinationMS = 30 * 1000; // Default 30 seconds - int fileRolloverSec = 24 * 60 * 60; // In seconds - int maxArchiveFiles = 100; - - int errorLogIntervalMS = 30 * 1000; // Every 30 seconds - long lastErrorLogMS = 0; - - List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>(); - - boolean isPending = false; - long lastAttemptTime = 0; - boolean initDone = false; - - PrintWriter logWriter = null; - AuditIndexRecord currentWriterIndexRecord = null; - AuditIndexRecord currentConsumerIndexRecord = null; - - BufferedReader logReader = null; - - Thread destinationThread = null; - - boolean isWriting = true; - boolean isDrain = false; - boolean isDestDown = true; - - private static Gson gson = null; - - public AuditFileSpool(AuditProvider queueProvider, - AuditProvider consumerProvider) { - this.queueProvider = queueProvider; - this.consumerProvider = consumerProvider; - } - - public void init(Properties prop) { - init(prop, null); - } - - public void init(Properties props, String basePropertyName) { - if (initDone) { - logger.error("init() called more than once. queueProvider=" - + queueProvider.getName() + ", consumerProvider=" - + consumerProvider.getName()); - return; - } - String propPrefix = "xasecure.audit.filespool"; - if (basePropertyName != null) { - propPrefix = basePropertyName; - } - - try { - gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS") - .create(); - - // Initial folder and file properties - String logFolderProp = MiscUtil.getStringProperty(props, propPrefix - + "." + PROP_FILE_SPOOL_LOCAL_DIR); - logFileNameFormat = MiscUtil.getStringProperty(props, - basePropertyName + "." + PROP_FILE_SPOOL_LOCAL_FILE_NAME); - String archiveFolderProp = MiscUtil.getStringProperty(props, - propPrefix + "." + PROP_FILE_SPOOL_ARCHIVE_DIR); - fileNamePrefix = MiscUtil.getStringProperty(props, propPrefix + "." - + PROP_FILE_SPOOL_FILENAME_PREFIX); - indexFileName = MiscUtil.getStringProperty(props, propPrefix + "." - + PROP_FILE_SPOOL_INDEX_FILE); - retryDestinationMS = MiscUtil.getIntProperty(props, propPrefix - + "." + PROP_FILE_SPOOL_DEST_RETRY_MS, retryDestinationMS); - fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_FILE_SPOOL_FILE_ROLLOVER, fileRolloverSec); - maxArchiveFiles = MiscUtil.getIntProperty(props, propPrefix + "." - + PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles); - - logger.info("retryDestinationMS=" + retryDestinationMS - + ", queueName=" + queueProvider.getName()); - logger.info("fileRolloverSec=" + fileRolloverSec + ", queueName=" - + queueProvider.getName()); - logger.info("maxArchiveFiles=" + maxArchiveFiles + ", queueName=" - + queueProvider.getName()); - - if (logFolderProp == null || logFolderProp.isEmpty()) { - logger.error("Audit spool folder is not configured. Please set " - + propPrefix - + "." - + PROP_FILE_SPOOL_LOCAL_DIR - + ". queueName=" + queueProvider.getName()); - return; - } - logFolder = new File(logFolderProp); - if (!logFolder.isDirectory()) { - logFolder.mkdirs(); - if (!logFolder.isDirectory()) { - logger.error("File Spool folder not found and can't be created. folder=" - + logFolder.getAbsolutePath() - + ", queueName=" - + queueProvider.getName()); - return; - } - } - logger.info("logFolder=" + logFolder + ", queueName=" - + queueProvider.getName()); - - if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { - logFileNameFormat = "spool_" + "%app-type%" + "_" - + "%time:yyyyMMdd-HHmm.ss%.log"; - } - logger.info("logFileNameFormat=" + logFileNameFormat - + ", queueName=" + queueProvider.getName()); - - if (archiveFolderProp == null || archiveFolderProp.isEmpty()) { - archiveFolder = new File(logFolder, "archive"); - } else { - archiveFolder = new File(archiveFolderProp); - } - if (!archiveFolder.isDirectory()) { - archiveFolder.mkdirs(); - if (!archiveFolder.isDirectory()) { - logger.error("File Spool archive folder not found and can't be created. folder=" - + archiveFolder.getAbsolutePath() - + ", queueName=" - + queueProvider.getName()); - return; - } - } - logger.info("archiveFolder=" + archiveFolder + ", queueName=" - + queueProvider.getName()); - - if (indexFileName == null || indexFileName.isEmpty()) { - indexFileName = "index_" + fileNamePrefix + ".json"; - } - - indexFile = new File(logFolder, indexFileName); - if (!indexFile.exists()) { - indexFile.createNewFile(); - } - logger.info("indexFile=" + indexFile + ", queueName=" - + queueProvider.getName()); - - int lastDot = indexFileName.lastIndexOf('.'); - indexDoneFileName = indexFileName.substring(0, lastDot) - + "_closed.json"; - indexDoneFile = new File(logFolder, indexDoneFileName); - if (!indexDoneFile.exists()) { - indexDoneFile.createNewFile(); - } - logger.info("indexDoneFile=" + indexDoneFile + ", queueName=" - + queueProvider.getName()); - - // Load index file - loadIndexFile(); - for (AuditIndexRecord auditIndexRecord : indexRecords) { - if (!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) { - isPending = true; - } - if (auditIndexRecord.status - .equals(SPOOL_FILE_STATUS.write_inprogress)) { - currentWriterIndexRecord = auditIndexRecord; - logger.info("currentWriterIndexRecord=" - + currentWriterIndexRecord.filePath - + ", queueName=" + queueProvider.getName()); - } - if (auditIndexRecord.status - .equals(SPOOL_FILE_STATUS.read_inprogress)) { - indexQueue.add(auditIndexRecord); - } - } - printIndex(); - // One more loop to add the rest of the pending records in reverse - // order - for (int i = 0; i < indexRecords.size(); i++) { - AuditIndexRecord auditIndexRecord = indexRecords.get(i); - if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) { - File consumerFile = new File(auditIndexRecord.filePath); - if (!consumerFile.exists()) { - logger.error("INIT: Consumer file=" - + consumerFile.getPath() + " not found."); - System.exit(1); - } - indexQueue.add(auditIndexRecord); - } - } - - } catch (Throwable t) { - logger.fatal("Error initializing File Spooler. queue=" - + queueProvider.getName(), t); - return; - } - initDone = true; - } - - /** - * Start looking for outstanding logs and update status according. - */ - public void start() { - if (!initDone) { - logger.error("Cannot start Audit File Spooler. Initilization not done yet. queueName=" - + queueProvider.getName()); - return; - } - - logger.info("Starting writerThread, queueName=" - + queueProvider.getName() + ", consumer=" - + consumerProvider.getName()); - - // Let's start the thread to read - destinationThread = new Thread(this, queueProvider.getName() - + "_destWriter"); - destinationThread.setDaemon(true); - destinationThread.start(); - } - - public void stop() { - if (!initDone) { - logger.error("Cannot stop Audit File Spooler. Initilization not done. queueName=" - + queueProvider.getName()); - return; - } - logger.info("Stop called, queueName=" + queueProvider.getName() - + ", consumer=" + consumerProvider.getName()); - - isDrain = true; - flush(); - - PrintWriter out = getOpenLogFileStream(); - if (out != null) { - // If write is still going on, then let's give it enough time to - // complete - for (int i = 0; i < 3; i++) { - if (isWriting) { - try { - Thread.sleep(1000); - } catch (InterruptedException e) { - // ignore - } - continue; - } - try { - logger.info("Closing open file, queueName=" - + queueProvider.getName() + ", consumer=" - + consumerProvider.getName()); - - out.flush(); - out.close(); - } catch (Throwable t) { - logger.debug("Error closing spool out file.", t); - } - } - } - try { - destinationThread.interrupt(); - } catch (Throwable e) { - // ignore - } - } - - public void flush() { - if (!initDone) { - logger.error("Cannot flush Audit File Spooler. Initilization not done. queueName=" - + queueProvider.getName()); - return; - } - PrintWriter out = getOpenLogFileStream(); - if (out != null) { - out.flush(); - } - } - - /** - * If any files are still not processed. Also, if the destination is not - * reachable - * - * @return - */ - public boolean isPending() { - if (!initDone) { - logError("isPending(): File Spooler not initialized. queueName=" - + queueProvider.getName()); - return false; - } - - return isPending; - } - - /** - * Milliseconds from last attempt time - * - * @return - */ - public long getLastAttemptTimeDelta() { - if (lastAttemptTime == 0) { - return 0; - } - return System.currentTimeMillis() - lastAttemptTime; - } - - synchronized public void stashLogs(AuditEventBase event) { - if (isDrain) { - // Stop has been called, so this method shouldn't be called - logger.error("stashLogs() is called after stop is called. event=" - + event); - return; - } - try { - isWriting = true; - PrintWriter logOut = getLogFileStream(); - // Convert event to json - String jsonStr = MiscUtil.stringify(event); - logOut.println(jsonStr); - isPending = true; - } catch (Exception ex) { - logger.error("Error writing to file. event=" + event, ex); - } finally { - isWriting = false; - } - - } - - synchronized public void stashLogs(Collection<AuditEventBase> events) { - for (AuditEventBase event : events) { - stashLogs(event); - } - flush(); - } - - synchronized public void stashLogsString(String event) { - if (isDrain) { - // Stop has been called, so this method shouldn't be called - logger.error("stashLogs() is called after stop is called. event=" - + event); - return; - } - try { - isWriting = true; - PrintWriter logOut = getLogFileStream(); - logOut.println(event); - } catch (Exception ex) { - logger.error("Error writing to file. event=" + event, ex); - } finally { - isWriting = false; - } - - } - - synchronized public void stashLogsString(Collection<String> events) { - for (String event : events) { - stashLogsString(event); - } - flush(); - } - - /** - * This return the current file. If there are not current open output file, - * then it will return null - * - * @return - * @throws Exception - */ - synchronized private PrintWriter getOpenLogFileStream() { - return logWriter; - } - - /** - * @return - * @throws Exception - */ - synchronized private PrintWriter getLogFileStream() throws Exception { - closeFileIfNeeded(); - - // Either there are no open log file or the previous one has been rolled - // over - if (currentWriterIndexRecord == null) { - Date currentTime = new Date(); - // Create a new file - String fileName = MiscUtil.replaceTokens(logFileNameFormat, - currentTime.getTime()); - String newFileName = fileName; - File outLogFile = null; - int i = 0; - while (true) { - outLogFile = new File(logFolder, newFileName); - File archiveLogFile = new File(archiveFolder, newFileName); - if (!outLogFile.exists() && !archiveLogFile.exists()) { - break; - } - i++; - int lastDot = fileName.lastIndexOf('.'); - String baseName = fileName.substring(0, lastDot); - String extension = fileName.substring(lastDot); - newFileName = baseName + "." + i + extension; - } - fileName = newFileName; - logger.info("Creating new file. queueName=" - + queueProvider.getName() + ", fileName=" + fileName); - // Open the file - logWriter = new PrintWriter(new BufferedWriter(new FileWriter( - outLogFile))); - - AuditIndexRecord tmpIndexRecord = new AuditIndexRecord(); - - tmpIndexRecord.id = MiscUtil.generateUniqueId(); - tmpIndexRecord.filePath = outLogFile.getPath(); - tmpIndexRecord.status = SPOOL_FILE_STATUS.write_inprogress; - tmpIndexRecord.fileCreateTime = currentTime; - tmpIndexRecord.lastAttempt = true; - currentWriterIndexRecord = tmpIndexRecord; - indexRecords.add(currentWriterIndexRecord); - saveIndexFile(); - - } else { - if (logWriter == null) { - // This means the process just started. We need to open the file - // in append mode. - logger.info("Opening existing file for append. queueName=" - + queueProvider.getName() + ", fileName=" - + currentWriterIndexRecord.filePath); - logWriter = new PrintWriter(new BufferedWriter(new FileWriter( - currentWriterIndexRecord.filePath, true))); - } - } - return logWriter; - } - - synchronized private void closeFileIfNeeded() throws FileNotFoundException, - IOException { - // Is there file open to write or there are no pending file, then close - // the active file - if (currentWriterIndexRecord != null) { - // Check whether the file needs to rolled - boolean closeFile = false; - if (indexRecords.size() == 1) { - closeFile = true; - logger.info("Closing file. Only one open file. queueName=" - + queueProvider.getName() + ", fileName=" - + currentWriterIndexRecord.filePath); - } else if (System.currentTimeMillis() - - currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) { - closeFile = true; - logger.info("Closing file. Rolling over. queueName=" - + queueProvider.getName() + ", fileName=" - + currentWriterIndexRecord.filePath); - } - if (closeFile) { - // Roll the file - if (logWriter != null) { - logWriter.flush(); - logWriter.close(); - logWriter = null; - } - currentWriterIndexRecord.status = SPOOL_FILE_STATUS.pending; - currentWriterIndexRecord.writeCompleteTime = new Date(); - saveIndexFile(); - logger.info("Adding file to queue. queueName=" - + queueProvider.getName() + ", fileName=" - + currentWriterIndexRecord.filePath); - indexQueue.add(currentWriterIndexRecord); - currentWriterIndexRecord = null; - } - } - } - - /** - * Load the index file - * - * @throws IOException - */ - void loadIndexFile() throws IOException { - logger.info("Loading index file. fileName=" + indexFile.getPath()); - BufferedReader br = new BufferedReader(new FileReader(indexFile)); - indexRecords.clear(); - String line; - while ((line = br.readLine()) != null) { - if (!line.isEmpty() && !line.startsWith("#")) { - AuditIndexRecord record = gson.fromJson(line, - AuditIndexRecord.class); - indexRecords.add(record); - } - } - br.close(); - } - - synchronized void printIndex() { - logger.info("INDEX printIndex() ==== START"); - Iterator<AuditIndexRecord> iter = indexRecords.iterator(); - while (iter.hasNext()) { - AuditIndexRecord record = iter.next(); - logger.info("INDEX=" + record + ", isFileExist=" - + (new File(record.filePath).exists())); - } - logger.info("INDEX printIndex() ==== END"); - } - - synchronized void removeIndexRecord(AuditIndexRecord indexRecord) - throws FileNotFoundException, IOException { - Iterator<AuditIndexRecord> iter = indexRecords.iterator(); - while (iter.hasNext()) { - AuditIndexRecord record = iter.next(); - if (record.id.equals(indexRecord.id)) { - logger.info("Removing file from index. file=" + record.filePath - + ", queueName=" + queueProvider.getName() - + ", consumer=" + consumerProvider.getName()); - - iter.remove(); - appendToDoneFile(record); - } - } - saveIndexFile(); - } - - synchronized void saveIndexFile() throws FileNotFoundException, IOException { - PrintWriter out = new PrintWriter(indexFile); - for (AuditIndexRecord auditIndexRecord : indexRecords) { - out.println(gson.toJson(auditIndexRecord)); - } - out.close(); - // printIndex(); - - } - - void appendToDoneFile(AuditIndexRecord indexRecord) - throws FileNotFoundException, IOException { - logger.info("Moving to done file. " + indexRecord.filePath - + ", queueName=" + queueProvider.getName() + ", consumer=" - + consumerProvider.getName()); - String line = gson.toJson(indexRecord); - PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter( - indexDoneFile, true))); - out.println(line); - out.flush(); - out.close(); - - // Move to archive folder - File logFile = null; - File archiveFile = null; - try { - logFile = new File(indexRecord.filePath); - String fileName = logFile.getName(); - archiveFile = new File(archiveFolder, fileName); - logger.info("Moving logFile " + logFile + " to " + archiveFile); - logFile.renameTo(archiveFile); - } catch (Throwable t) { - logger.error("Error moving log file to archive folder. logFile=" - + logFile + ", archiveFile=" + archiveFile, t); - } - - archiveFile = null; - try { - // Remove old files - File[] logFiles = archiveFolder.listFiles(new FileFilter() { - public boolean accept(File pathname) { - return pathname.getName().toLowerCase().endsWith(".log"); - } - }); - - if (logFiles.length > maxArchiveFiles) { - int filesToDelete = logFiles.length - maxArchiveFiles; - BufferedReader br = new BufferedReader(new FileReader( - indexDoneFile)); - try { - int filesDeletedCount = 0; - while ((line = br.readLine()) != null) { - if (!line.isEmpty() && !line.startsWith("#")) { - AuditIndexRecord record = gson.fromJson(line, - AuditIndexRecord.class); - logFile = new File(record.filePath); - String fileName = logFile.getName(); - archiveFile = new File(archiveFolder, fileName); - if (archiveFile.exists()) { - logger.info("Deleting archive file " - + archiveFile); - boolean ret = archiveFile.delete(); - if (!ret) { - logger.error("Error deleting archive file. archiveFile=" - + archiveFile); - } - filesDeletedCount++; - if (filesDeletedCount >= filesToDelete) { - logger.info("Deleted " + filesDeletedCount - + " files"); - break; - } - } - } - } - } finally { - br.close(); - } - } - } catch (Throwable t) { - logger.error("Error deleting older archive file. archiveFile=" - + archiveFile, t); - } - - } - - void logError(String msg) { - long currTimeMS = System.currentTimeMillis(); - if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) { - logger.error(msg); - lastErrorLogMS = currTimeMS; - } - } - - class AuditIndexRecord { - String id; - String filePath; - int linePosition = 0; - SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress; - Date fileCreateTime; - Date writeCompleteTime; - Date doneCompleteTime; - Date lastSuccessTime; - Date lastFailedTime; - int failedAttemptCount = 0; - boolean lastAttempt = false; - - @Override - public String toString() { - return "AuditIndexRecord [id=" + id + ", filePath=" + filePath - + ", linePosition=" + linePosition + ", status=" + status - + ", fileCreateTime=" + fileCreateTime - + ", writeCompleteTime=" + writeCompleteTime - + ", doneCompleteTime=" + doneCompleteTime - + ", lastSuccessTime=" + lastSuccessTime - + ", lastFailedTime=" + lastFailedTime - + ", failedAttemptCount=" + failedAttemptCount - + ", lastAttempt=" + lastAttempt + "]"; - } - - } - - class AuditFileSpoolAttempt { - Date attemptTime; - String status; - } - - /* - * (non-Javadoc) - * - * @see java.lang.Runnable#run() - */ - @Override - public void run() { - while (true) { - try { - // Let's pause between each iteration - if (currentConsumerIndexRecord == null) { - currentConsumerIndexRecord = indexQueue.poll( - retryDestinationMS, TimeUnit.MILLISECONDS); - } else { - Thread.sleep(retryDestinationMS); - } - - if (isDrain) { - // Need to exit - break; - } - if (currentConsumerIndexRecord == null) { - closeFileIfNeeded(); - continue; - } - - boolean isRemoveIndex = false; - File consumerFile = new File( - currentConsumerIndexRecord.filePath); - if (!consumerFile.exists()) { - logger.error("Consumer file=" + consumerFile.getPath() - + " not found."); - printIndex(); - isRemoveIndex = true; - } else { - // Let's open the file to write - BufferedReader br = new BufferedReader(new FileReader( - currentConsumerIndexRecord.filePath)); - try { - int startLine = currentConsumerIndexRecord.linePosition; - String line; - int currLine = 0; - boolean isResumed = false; - List<String> lines = new ArrayList<String>(); - while ((line = br.readLine()) != null) { - currLine++; - if (currLine < startLine) { - continue; - } - lines.add(line); - if (lines.size() == queueProvider.getMaxBatchSize()) { - boolean ret = sendEvent(lines, - currentConsumerIndexRecord, currLine); - if (!ret) { - throw new Exception("Destination down"); - } else { - if (!isResumed) { - logger.info("Started writing to destination. file=" - + currentConsumerIndexRecord.filePath - + ", queueName=" - + queueProvider.getName() - + ", consumer=" - + consumerProvider.getName()); - } - } - lines.clear(); - } - } - if (lines.size() > 0) { - boolean ret = sendEvent(lines, - currentConsumerIndexRecord, currLine); - if (!ret) { - throw new Exception("Destination down"); - } else { - if (!isResumed) { - logger.info("Started writing to destination. file=" - + currentConsumerIndexRecord.filePath - + ", queueName=" - + queueProvider.getName() - + ", consumer=" - + consumerProvider.getName()); - } - } - lines.clear(); - } - logger.info("Done reading file. file=" - + currentConsumerIndexRecord.filePath - + ", queueName=" + queueProvider.getName() - + ", consumer=" + consumerProvider.getName()); - // The entire file is read - currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done; - currentConsumerIndexRecord.doneCompleteTime = new Date(); - currentConsumerIndexRecord.lastAttempt = true; - - isRemoveIndex = true; - } catch (Exception ex) { - isDestDown = true; - logError("Destination down. queueName=" - + queueProvider.getName() + ", consumer=" - + consumerProvider.getName()); - lastAttemptTime = System.currentTimeMillis(); - // Update the index file - currentConsumerIndexRecord.lastFailedTime = new Date(); - currentConsumerIndexRecord.failedAttemptCount++; - currentConsumerIndexRecord.lastAttempt = false; - saveIndexFile(); - } finally { - br.close(); - } - } - if (isRemoveIndex) { - // Remove this entry from index - removeIndexRecord(currentConsumerIndexRecord); - currentConsumerIndexRecord = null; - closeFileIfNeeded(); - } - } catch (Throwable t) { - logger.error("Exception in destination writing thread.", t); - } - } - logger.info("Exiting file spooler. provider=" + queueProvider.getName() - + ", consumer=" + consumerProvider.getName()); - } - - private boolean sendEvent(List<String> lines, AuditIndexRecord indexRecord, - int currLine) { - boolean ret = true; - try { - ret = consumerProvider.logJSON(lines); - if (!ret) { - // Need to log error after fixed interval - logError("Error sending logs to consumer. provider=" - + queueProvider.getName() + ", consumer=" - + consumerProvider.getName()); - } else { - // Update index and save - indexRecord.linePosition = currLine; - indexRecord.status = SPOOL_FILE_STATUS.read_inprogress; - indexRecord.lastSuccessTime = new Date(); - indexRecord.lastAttempt = true; - saveIndexFile(); - - if (isDestDown) { - isDestDown = false; - logger.info("Destination up now. " + indexRecord.filePath - + ", queueName=" + queueProvider.getName() - + ", consumer=" + consumerProvider.getName()); - } - } - } catch (Throwable t) { - logger.error("Error while sending logs to consumer. provider=" - + queueProvider.getName() + ", consumer=" - + consumerProvider.getName() + ", log=" + lines, t); - } - - return ret; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index 13b3142..a67f7e0 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -24,9 +24,14 @@ import java.util.Properties; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.destination.FileAuditDestination; +import org.apache.ranger.audit.destination.HDFSAuditDestination; import org.apache.ranger.audit.provider.hdfs.HdfsAuditProvider; import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider; import org.apache.ranger.audit.provider.solr.SolrAuditProvider; +import org.apache.ranger.audit.queue.AuditAsyncQueue; +import org.apache.ranger.audit.queue.AuditBatchQueue; +import org.apache.ranger.audit.queue.AuditSummaryQueue; /* * TODO: @@ -90,10 +95,11 @@ public class AuditProviderFactory { LOG.info("AuditProviderFactory: initializing.."); if (mInitDone) { - LOG.warn("AuditProviderFactory.init(): already initialized!", + LOG.warn( + "AuditProviderFactory.init(): already initialized! Will try to re-initialize", new Exception()); - return; + // return; } mInitDone = true; @@ -125,7 +131,7 @@ public class AuditProviderFactory { for (Object propNameObj : props.keySet()) { String propName = propNameObj.toString(); - if (propName.length() <= AUDIT_DEST_BASE.length() + 1) { + if (!propName.startsWith(AUDIT_DEST_BASE)) { continue; } String destName = propName.substring(AUDIT_DEST_BASE.length() + 1); @@ -152,9 +158,14 @@ public class AuditProviderFactory { String queueName = MiscUtil.getStringProperty(props, destPropPrefix + "." + BaseAuditProvider.PROP_QUEUE); - if( queueName == null || queueName.isEmpty()) { + if (queueName == null || queueName.isEmpty()) { + LOG.info(destPropPrefix + "." + + BaseAuditProvider.PROP_QUEUE + + " is not set. Setting queue to batch for " + + destName); queueName = "batch"; } + LOG.info("queue for " + destName + " is " + queueName); if (queueName != null && !queueName.isEmpty() && !queueName.equalsIgnoreCase("none")) { String queuePropPrefix = destPropPrefix + "." + queueName; @@ -184,24 +195,55 @@ public class AuditProviderFactory { } } if (providers.size() > 0) { - LOG.info("Using v2 audit configuration"); + LOG.info("Using v3 audit configuration"); AuditAsyncQueue asyncQueue = new AuditAsyncQueue(); - String propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "async"; + String propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + + "async"; asyncQueue.init(props, propPrefix); + propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX; + boolean summaryEnabled = MiscUtil.getBooleanProperty(props, + propPrefix + "." + "summary" + "." + "enabled", false); + AuditSummaryQueue summaryQueue = null; + if (summaryEnabled) { + LOG.info("AuditSummaryQueue is enabled"); + summaryQueue = new AuditSummaryQueue(); + summaryQueue.init(props, propPrefix); + asyncQueue.setConsumer(summaryQueue); + } else { + LOG.info("AuditSummaryQueue is disabled"); + } + if (providers.size() == 1) { - asyncQueue.setConsumer(providers.get(0)); + if (summaryEnabled) { + LOG.info("Setting " + providers.get(0).getName() + + " as consumer to AuditSummaryQueue"); + summaryQueue.setConsumer(providers.get(0)); + } else { + LOG.info("Setting " + providers.get(0).getName() + + " as consumer to " + asyncQueue.getName()); + asyncQueue.setConsumer(providers.get(0)); + } } else { MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider(); multiDestProvider.init(props); multiDestProvider.addAuditProviders(providers); - asyncQueue.setConsumer(multiDestProvider); + if (summaryEnabled) { + LOG.info("Setting " + multiDestProvider.getName() + + " as consumer to AuditSummaryQueue"); + summaryQueue.setConsumer(multiDestProvider); + } else { + LOG.info("Setting " + multiDestProvider.getName() + + " as consumer to " + asyncQueue.getName()); + asyncQueue.setConsumer(multiDestProvider); + } } mProvider = asyncQueue; + LOG.info("Starting " + mProvider.getName()); mProvider.start(); } else { - LOG.info("No v2 audit configuration found. Trying v1 audit configurations"); + LOG.info("No v3 audit configuration found. Trying v2 audit configurations"); if (!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled || isAuditToKafkaEnabled || isAuditToLog4jEnabled @@ -356,7 +398,7 @@ public class AuditProviderFactory { .newInstance(); } catch (Exception e) { LOG.fatal("Can't instantiate audit class for providerName=" - + providerName + ", className=" + className); + + providerName + ", className=" + className, e); } } else { if (providerName.equals("file")) { @@ -372,7 +414,7 @@ public class AuditProviderFactory { } else if (providerName.equals("log4j")) { provider = new Log4jAuditProvider(); } else if (providerName.equals("batch")) { - provider = new AuditBatchProcessor(); + provider = new AuditBatchQueue(); } else if (providerName.equals("async")) { provider = new AuditAsyncQueue(); } else { http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/236f1ba6/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java index 576176c..85c207b 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/BaseAuditProvider.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.ranger.audit.model.AuditEventBase; import org.apache.ranger.audit.model.AuthzAuditEvent; +import org.apache.ranger.audit.queue.AuditFileSpool; import com.google.gson.GsonBuilder;
