RANGER-397 - Implement reliable streaming audits to configurable destinations
First cut with HDFS and File destination working Project: http://git-wip-us.apache.org/repos/asf/incubator-ranger/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ranger/commit/eb1e9b5c Tree: http://git-wip-us.apache.org/repos/asf/incubator-ranger/tree/eb1e9b5c Diff: http://git-wip-us.apache.org/repos/asf/incubator-ranger/diff/eb1e9b5c Branch: refs/heads/master Commit: eb1e9b5c447dc3960d132bd2dc5adfbb33d5cb2c Parents: 917833c Author: Don Bosco Durai <bo...@apache.org> Authored: Tue Apr 14 18:00:46 2015 -0700 Committer: Don Bosco Durai <bo...@apache.org> Committed: Tue Apr 14 18:02:57 2015 -0700 ---------------------------------------------------------------------- .gitignore | 1 + .../ranger/audit/model/AuditEventBase.java | 14 +- .../ranger/audit/model/AuthzAuditEvent.java | 41 +- .../audit/provider/AsyncAuditProvider.java | 5 +- .../ranger/audit/provider/AuditAsyncQueue.java | 167 ++++ .../audit/provider/AuditBatchProcessor.java | 327 +++++++ .../ranger/audit/provider/AuditDestination.java | 70 ++ .../ranger/audit/provider/AuditFileSpool.java | 875 +++++++++++++++++++ .../audit/provider/AuditMessageException.java | 67 ++ .../ranger/audit/provider/AuditProvider.java | 22 +- .../audit/provider/AuditProviderFactory.java | 386 +++++--- .../audit/provider/BaseAuditProvider.java | 400 +++++++-- .../audit/provider/BufferedAuditProvider.java | 32 +- .../ranger/audit/provider/DbAuditProvider.java | 47 +- .../audit/provider/DummyAuditProvider.java | 76 +- .../audit/provider/FileAuditDestination.java | 230 +++++ .../audit/provider/HDFSAuditDestination.java | 243 +++++ .../audit/provider/LocalFileLogBuffer.java | 17 +- .../audit/provider/Log4jAuditProvider.java | 57 +- .../ranger/audit/provider/LogDestination.java | 16 +- .../apache/ranger/audit/provider/MiscUtil.java | 291 ++++-- .../audit/provider/MultiDestAuditProvider.java | 153 +++- .../audit/provider/hdfs/HdfsAuditProvider.java | 3 +- .../audit/provider/hdfs/HdfsLogDestination.java | 48 +- .../provider/kafka/KafkaAuditProvider.java | 46 +- .../audit/provider/solr/SolrAuditProvider.java | 52 +- security-admin/.gitignore | 3 + security-admin/pom.xml | 6 +- .../apache/ranger/audit/TestAuditProcessor.java | 786 +++++++++++++++++ 29 files changed, 4081 insertions(+), 400 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index 2c746ed..dd4e2c2 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,6 @@ *.class *.iml +.pydevproject .settings/ .metadata .classpath http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java index 82fcab8..a44e047 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuditEventBase.java @@ -17,24 +17,26 @@ * under the License. */ - package org.apache.ranger.audit.model; +package org.apache.ranger.audit.model; import org.apache.ranger.audit.dao.DaoManager; - public abstract class AuditEventBase { + protected AuditEventBase() { } public abstract void persist(DaoManager daoManager); - + protected String trim(String str, int len) { - String ret = str ; + String ret = str; if (str != null) { if (str.length() > len) { - ret = str.substring(0,len) ; + ret = str.substring(0, len); } } - return ret ; + return ret; } + + } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java index d0c1526..af89f60 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/model/AuthzAuditEvent.java @@ -24,6 +24,7 @@ import java.util.Date; import org.apache.ranger.audit.dao.DaoManager; import org.apache.ranger.audit.entity.AuthzAuditEventDbObj; +import com.google.gson.Gson; import com.google.gson.annotations.SerializedName; @@ -33,7 +34,6 @@ public class AuthzAuditEvent extends AuditEventBase { protected static final int MAX_ACTION_FIELD_SIZE = 1800 ; protected static final int MAX_REQUEST_DATA_FIELD_SIZE = 1800 ; - @SerializedName("repoType") protected int repositoryType = 0; @@ -94,6 +94,17 @@ public class AuthzAuditEvent extends AuditEventBase { @SerializedName("id") protected String eventId = null; + /** + * This to ensure order within a session. Order not guaranteed across processes and hosts + */ + @SerializedName("seq_num") + protected long seqNum = 0; + + @SerializedName("freq_count") + protected long frequencyCount = 1; + + @SerializedName("freq_dur_ms") + protected long frequencyDurationMS = 0; public AuthzAuditEvent() { super(); @@ -400,6 +411,31 @@ public class AuthzAuditEvent extends AuditEventBase { } + + public long getSeqNum() { + return seqNum; + } + + public void setSeqNum(long seqNum) { + this.seqNum = seqNum; + } + + public long getFrequencyCount() { + return frequencyCount; + } + + public void setFrequencyCount(long frequencyCount) { + this.frequencyCount = frequencyCount; + } + + public long getFrequencyDurationMS() { + return frequencyDurationMS; + } + + public void setFrequencyDurationMS(long frequencyDurationMS) { + this.frequencyDurationMS = frequencyDurationMS; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); @@ -432,6 +468,9 @@ public class AuthzAuditEvent extends AuditEventBase { .append("agentHostname=").append(agentHostname).append(FIELD_SEPARATOR) .append("logType=").append(logType).append(FIELD_SEPARATOR) .append("eventId=").append(eventId).append(FIELD_SEPARATOR) + .append("seq_num=").append(seqNum).append(FIELD_SEPARATOR) + .append("freq_count=").append(frequencyCount).append(FIELD_SEPARATOR) + .append("freq_dur_ms=").append(frequencyDurationMS).append(FIELD_SEPARATOR) ; return sb; } http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java index 5da5064..53adc86 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AsyncAuditProvider.java @@ -90,10 +90,11 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements } @Override - public void log(AuditEventBase event) { + public boolean log(AuditEventBase event) { LOG.debug("AsyncAuditProvider.logEvent(AuditEventBase)"); queueEvent(event); + return true; } @Override @@ -230,7 +231,7 @@ public class AsyncAuditProvider extends MultiDestAuditProvider implements return mQueue.isEmpty(); } - private void waitToComplete(long maxWaitSeconds) { + public void waitToComplete(long maxWaitSeconds) { LOG.debug("==> AsyncAuditProvider.waitToComplete()"); for (long waitTime = 0; !isEmpty() http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java new file mode 100644 index 0000000..5553bcc --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditAsyncQueue.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.provider; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.concurrent.LinkedTransferQueue; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; + +/** + * This is a non-blocking queue with no limit on capacity. + */ +public class AuditAsyncQueue extends BaseAuditProvider implements Runnable { + private static final Log logger = LogFactory.getLog(AuditAsyncQueue.class); + + LinkedTransferQueue<AuditEventBase> queue = new LinkedTransferQueue<AuditEventBase>(); + Thread consumerThread = null; + + static int threadCount = 0; + static final String DEFAULT_NAME = "async"; + + public AuditAsyncQueue() { + setName(DEFAULT_NAME); + } + + public AuditAsyncQueue(AuditProvider consumer) { + super(consumer); + setName(DEFAULT_NAME); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. + * audit.model.AuditEventBase) + */ + @Override + public boolean log(AuditEventBase event) { + // Add to the queue and return ASAP + if (queue.size() >= getMaxQueueSize()) { + return false; + } + queue.add(event); + addLifeTimeInLogCount(1); + return true; + } + + @Override + public boolean log(Collection<AuditEventBase> events) { + for (AuditEventBase event : events) { + log(event); + } + return true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#start() + */ + @Override + public void start() { + if(consumer != null) { + consumer.start(); + } + + consumerThread = new Thread(this, this.getClass().getName() + + (threadCount++)); + consumerThread.setDaemon(true); + consumerThread.start(); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#stop() + */ + @Override + public void stop() { + setDrain(true); + try { + consumerThread.interrupt(); + } catch (Throwable t) { + // ignore any exception + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() + */ + @Override + public boolean isFlushPending() { + if (queue.isEmpty()) { + return consumer.isFlushPending(); + } + return true; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + while (true) { + try { + AuditEventBase event = null; + if (!isDrain()) { + // For Transfer queue take() is blocking + event = queue.take(); + } else { + // For Transfer queue poll() is non blocking + event = queue.poll(); + } + if (event != null) { + Collection<AuditEventBase> eventList = new ArrayList<AuditEventBase>(); + eventList.add(event); + // TODO: Put a limit. Hard coding to 1000 (use batch size + // property) + queue.drainTo(eventList, 1000 - 1); + consumer.log(eventList); + eventList.clear(); + } + } catch (InterruptedException e) { + logger.info( + "Caught exception in consumer thread. Mostly to about loop", + e); + } catch (Throwable t) { + logger.error("Caught error during processing request.", t); + } + if (isDrain() && queue.isEmpty()) { + break; + } + } + try { + // Call stop on the consumer + consumer.stop(); + } catch (Throwable t) { + logger.error("Error while calling stop on consumer.", t); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java new file mode 100644 index 0000000..58d122a --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditBatchProcessor.java @@ -0,0 +1,327 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.provider; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Properties; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; + +public class AuditBatchProcessor extends BaseAuditProvider implements Runnable { + private static final Log logger = LogFactory + .getLog(AuditBatchProcessor.class); + + private BlockingQueue<AuditEventBase> queue = null; + private Collection<AuditEventBase> localBatchBuffer = new ArrayList<AuditEventBase>(); + + Thread consumerThread = null; + static int threadCount = 0; + + public AuditBatchProcessor() { + } + + public AuditBatchProcessor(AuditProvider consumer) { + super(consumer); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#log(org.apache.ranger. + * audit.model.AuditEventBase) + */ + @Override + public boolean log(AuditEventBase event) { + // Add to batchQueue. Block if full + queue.add(event); + addLifeTimeInLogCount(1); + return true; + } + + @Override + public boolean log(Collection<AuditEventBase> events) { + for (AuditEventBase event : events) { + log(event); + } + return true; + } + + @Override + public void init(Properties prop, String basePropertyName) { + String propPrefix = "xasecure.audit.batch"; + if (basePropertyName != null) { + propPrefix = basePropertyName; + } + + super.init(prop, propPrefix); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#start() + */ + @Override + synchronized public void start() { + if (consumerThread != null) { + logger.error("Provider is already started. name=" + getName()); + return; + } + logger.info("Creating ArrayBlockingQueue with maxSize=" + + getMaxQueueSize()); + queue = new ArrayBlockingQueue<AuditEventBase>(getMaxQueueSize()); + + // Start the consumer first + consumer.start(); + + // Then the FileSpooler + if (fileSpoolerEnabled) { + fileSpooler.start(); + } + + // Finally the queue listener + consumerThread = new Thread(this, this.getClass().getName() + + (threadCount++)); + consumerThread.setDaemon(true); + consumerThread.start(); + + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#stop() + */ + @Override + public void stop() { + setDrain(true); + flush(); + try { + consumerThread.interrupt(); + } catch (Throwable t) { + // ignore any exception + } + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#waitToComplete() + */ + @Override + public void waitToComplete() { + int defaultTimeOut = -1; + waitToComplete(defaultTimeOut); + consumer.waitToComplete(defaultTimeOut); + } + + @Override + public void waitToComplete(long timeout) { + setDrain(true); + flush(); + long sleepTime = 1000; + long startTime = System.currentTimeMillis(); + int prevQueueSize = -1; + int staticLoopCount = 0; + while ((queue.size() > 0 || localBatchBuffer.size() > 0)) { + if (prevQueueSize == queue.size()) { + logger.error("Queue size is not changing. " + getName() + + ".size=" + queue.size()); + staticLoopCount++; + if (staticLoopCount > 5) { + logger.error("Aborting writing to consumer. Some logs will be discarded." + + getName() + ".size=" + queue.size()); + } + } else { + staticLoopCount = 0; + } + consumerThread.interrupt(); + try { + Thread.sleep(sleepTime); + if (timeout > 0 + && (System.currentTimeMillis() - startTime > timeout)) { + break; + } + } catch (InterruptedException e) { + break; + } + } + consumer.waitToComplete(timeout); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() + */ + @Override + public boolean isFlushPending() { + if (queue.isEmpty()) { + return consumer.isFlushPending(); + } + return true; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#flush() + */ + @Override + public void flush() { + if (fileSpoolerEnabled) { + fileSpooler.flush(); + } + consumer.flush(); + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + long lastDispatchTime = System.currentTimeMillis(); + boolean isDestActive = true; + while (true) { + // Time to next dispatch + long nextDispatchDuration = lastDispatchTime + - System.currentTimeMillis() + getMaxBatchInterval(); + + boolean isToSpool = false; + boolean fileSpoolDrain = false; + try { + if (fileSpoolerEnabled && fileSpooler.isPending()) { + int percentUsed = (getMaxQueueSize() - queue.size()) * 100 + / getMaxQueueSize(); + long lastAttemptDelta = fileSpooler + .getLastAttemptTimeDelta(); + + fileSpoolDrain = lastAttemptDelta > fileSpoolMaxWaitTime; + // If we should even read from queue? + if (!isDrain() && !fileSpoolDrain + && percentUsed < fileSpoolDrainThresholdPercent) { + // Since some files are still under progress and it is + // not in drain mode, lets wait and retry + if (nextDispatchDuration > 0) { + Thread.sleep(nextDispatchDuration); + } + continue; + } + isToSpool = true; + } + + AuditEventBase event = null; + + if (!isToSpool && !isDrain() && !fileSpoolDrain + && nextDispatchDuration > 0) { + event = queue.poll(nextDispatchDuration, + TimeUnit.MILLISECONDS); + + } else { + // For poll() is non blocking + event = queue.poll(); + } + if (event != null) { + localBatchBuffer.add(event); + if (getMaxBatchSize() >= localBatchBuffer.size()) { + queue.drainTo(localBatchBuffer, getMaxBatchSize() + - localBatchBuffer.size()); + } + } + } catch (InterruptedException e) { + logger.info( + "Caught exception in consumer thread. Mostly to abort loop", + e); + } catch (Throwable t) { + logger.error("Caught error during processing request.", t); + } + + if (localBatchBuffer.size() > 0 && isToSpool) { + // Let spool to the file directly + if (isDestActive) { + logger.info("Switching to file spool. Queue=" + getName() + + ", dest=" + consumer.getName()); + } + isDestActive = false; + fileSpooler.stashLogs(localBatchBuffer); + localBatchBuffer.clear(); + // Reset all variables + lastDispatchTime = System.currentTimeMillis(); + } else if (localBatchBuffer.size() > 0 + && (isDrain() + || localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) { + if (fileSpoolerEnabled && !isDestActive) { + logger.info("Switching to writing to destination. Queue=" + + getName() + ", dest=" + consumer.getName()); + } + boolean ret = consumer.log(localBatchBuffer); + if (!ret) { + if (fileSpoolerEnabled) { + logger.info("Switching to file spool. Queue=" + + getName() + ", dest=" + consumer.getName()); + // Transient error. Stash and move on + fileSpooler.stashLogs(localBatchBuffer); + isDestActive = false; + } else { + // We need to drop this event + logFailedEvent(localBatchBuffer, null); + } + } else { + isDestActive = true; + } + localBatchBuffer.clear(); + // Reset all variables + lastDispatchTime = System.currentTimeMillis(); + } + + if (isDrain()) { + if (!queue.isEmpty() || localBatchBuffer.size() > 0) { + logger.info("Queue is not empty. Will retry. queue.size)=" + + queue.size() + ", localBatchBuffer.size()=" + + localBatchBuffer.size()); + } else { + break; + } + } + } + + logger.info("Exiting consumerThread. Queue=" + getName() + ", dest=" + + consumer.getName()); + try { + // Call stop on the consumer + consumer.stop(); + if (fileSpoolerEnabled) { + fileSpooler.stop(); + } + } catch (Throwable t) { + logger.error("Error while calling stop on consumer.", t); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java new file mode 100644 index 0000000..11c32ca --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditDestination.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.provider; + +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; + +/** + * This class needs to be extended by anyone who wants to build custom + * destination + */ +public abstract class AuditDestination extends BaseAuditProvider { + private static final Log logger = LogFactory.getLog(AuditDestination.class); + + public AuditDestination() { + logger.info("AuditDestination() enter"); + } + + /* + * (non-Javadoc) + * + * @see + * org.apache.ranger.audit.provider.AuditProvider#init(java.util.Properties, + * java.lang.String) + */ + @Override + public void init(Properties prop, String basePropertyName) { + super.init(prop, basePropertyName); + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#isFlushPending() + */ + @Override + public boolean isFlushPending() { + return false; + } + + /* + * (non-Javadoc) + * + * @see org.apache.ranger.audit.provider.AuditProvider#flush() + */ + @Override + public void flush() { + + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java new file mode 100644 index 0000000..8b006de --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditFileSpool.java @@ -0,0 +1,875 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.provider; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileFilter; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedTransferQueue; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +/** + * This class temporarily stores logs in file system if the destination is + * overloaded or down + */ +public class AuditFileSpool implements Runnable { + private static final Log logger = LogFactory.getLog(AuditFileSpool.class); + + public enum SPOOL_FILE_STATUS { + pending, write_inprogress, read_inprogress, done + } + + public static final String PROP_FILE_SPOOL_LOCAL_DIR = "filespool.dir"; + public static final String PROP_FILE_SPOOL_LOCAL_FILE_NAME = "filespool.filename.format"; + public static final String PROP_FILE_SPOOL_ARCHIVE_DIR = "filespool.archive.dir"; + public static final String PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT = "filespool.archive.max.files"; + public static final String PROP_FILE_SPOOL_FILENAME_PREFIX = "filespool.file.prefix"; + public static final String PROP_FILE_SPOOL_FILE_ROLLOVER = "filespool.file.rollover.sec"; + public static final String PROP_FILE_SPOOL_INDEX_FILE = "filespool.index.filename"; + // public static final String PROP_FILE_SPOOL_INDEX_DONE_FILE = + // "filespool.index.done_filename"; + public static final String PROP_FILE_SPOOL_DEST_RETRY_MS = "filespool.destination.retry.ms"; + + AuditProvider queueProvider = null; + AuditProvider consumerProvider = null; + + BlockingQueue<AuditIndexRecord> indexQueue = new LinkedTransferQueue<AuditIndexRecord>(); + + // Folder and File attributes + File logFolder = null; + String logFileNameFormat = null; + File archiveFolder = null; + String fileNamePrefix = null; + String indexFileName = null; + File indexFile = null; + String indexDoneFileName = null; + File indexDoneFile = null; + int retryDestinationMS = 30 * 1000; // Default 30 seconds + int fileRolloverSec = 24 * 60 * 60; // In seconds + int maxArchiveFiles = 100; + + int errorLogIntervalMS = 30 * 1000; // Every 30 seconds + long lastErrorLogMS = 0; + + List<AuditIndexRecord> indexRecords = new ArrayList<AuditIndexRecord>(); + + boolean isPending = false; + long lastAttemptTime = 0; + boolean initDone = false; + + PrintWriter logWriter = null; + AuditIndexRecord currentWriterIndexRecord = null; + AuditIndexRecord currentConsumerIndexRecord = null; + + BufferedReader logReader = null; + + Thread destinationThread = null; + + boolean isWriting = true; + boolean isDrain = false; + boolean isDestDown = true; + + private static Gson gson = null; + + public AuditFileSpool(AuditProvider queueProvider, + AuditProvider consumerProvider) { + this.queueProvider = queueProvider; + this.consumerProvider = consumerProvider; + } + + public void init(Properties prop) { + init(prop, null); + } + + public void init(Properties props, String basePropertyName) { + if (initDone) { + logger.error("init() called more than once. queueProvider=" + + queueProvider.getName() + ", consumerProvider=" + + consumerProvider.getName()); + return; + } + String propPrefix = "xasecure.audit.filespool"; + if (basePropertyName != null) { + propPrefix = basePropertyName; + } + + try { + gson = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS") + .create(); + + // Initial folder and file properties + String logFolderProp = MiscUtil.getStringProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_LOCAL_DIR); + logFileNameFormat = MiscUtil.getStringProperty(props, + basePropertyName + "." + PROP_FILE_SPOOL_LOCAL_FILE_NAME); + String archiveFolderProp = MiscUtil.getStringProperty(props, + propPrefix + "." + PROP_FILE_SPOOL_ARCHIVE_DIR); + fileNamePrefix = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_FILENAME_PREFIX); + indexFileName = MiscUtil.getStringProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_INDEX_FILE); + retryDestinationMS = MiscUtil.getIntProperty(props, propPrefix + + "." + PROP_FILE_SPOOL_DEST_RETRY_MS, retryDestinationMS); + fileRolloverSec = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_FILE_ROLLOVER, fileRolloverSec); + maxArchiveFiles = MiscUtil.getIntProperty(props, propPrefix + "." + + PROP_FILE_SPOOL_ARCHIVE_MAX_FILES_COUNT, maxArchiveFiles); + + logger.info("retryDestinationMS=" + retryDestinationMS + + ", queueName=" + queueProvider.getName()); + logger.info("fileRolloverSec=" + fileRolloverSec + ", queueName=" + + queueProvider.getName()); + logger.info("maxArchiveFiles=" + maxArchiveFiles + ", queueName=" + + queueProvider.getName()); + + if (logFolderProp == null || logFolderProp.isEmpty()) { + logger.error("Audit spool folder is not configured. Please set " + + propPrefix + + "." + + PROP_FILE_SPOOL_LOCAL_DIR + + ". queueName=" + queueProvider.getName()); + return; + } + logFolder = new File(logFolderProp); + if (!logFolder.isDirectory()) { + logFolder.mkdirs(); + if (!logFolder.isDirectory()) { + logger.error("File Spool folder not found and can't be created. folder=" + + logFolder.getAbsolutePath() + + ", queueName=" + + queueProvider.getName()); + return; + } + } + logger.info("logFolder=" + logFolder + ", queueName=" + + queueProvider.getName()); + + if (logFileNameFormat == null || logFileNameFormat.isEmpty()) { + logFileNameFormat = "spool_" + "%app-type%" + "_" + + "%time:yyyyMMdd-HHmm.ss%.log"; + } + logger.info("logFileNameFormat=" + logFileNameFormat + + ", queueName=" + queueProvider.getName()); + + if (archiveFolderProp == null || archiveFolderProp.isEmpty()) { + archiveFolder = new File(logFolder, "archive"); + } else { + archiveFolder = new File(archiveFolderProp); + } + if (!archiveFolder.isDirectory()) { + archiveFolder.mkdirs(); + if (!archiveFolder.isDirectory()) { + logger.error("File Spool archive folder not found and can't be created. folder=" + + archiveFolder.getAbsolutePath() + + ", queueName=" + + queueProvider.getName()); + return; + } + } + logger.info("archiveFolder=" + archiveFolder + ", queueName=" + + queueProvider.getName()); + + if (indexFileName == null || indexFileName.isEmpty()) { + indexFileName = "index_" + fileNamePrefix + ".json"; + } + + indexFile = new File(logFolder, indexFileName); + if (!indexFile.exists()) { + indexFile.createNewFile(); + } + logger.info("indexFile=" + indexFile + ", queueName=" + + queueProvider.getName()); + + int lastDot = indexFileName.lastIndexOf('.'); + indexDoneFileName = indexFileName.substring(0, lastDot) + + "_closed.json"; + indexDoneFile = new File(logFolder, indexDoneFileName); + if (!indexDoneFile.exists()) { + indexDoneFile.createNewFile(); + } + logger.info("indexDoneFile=" + indexDoneFile + ", queueName=" + + queueProvider.getName()); + + // Load index file + loadIndexFile(); + for (AuditIndexRecord auditIndexRecord : indexRecords) { + if (!auditIndexRecord.status.equals(SPOOL_FILE_STATUS.done)) { + isPending = true; + } + if (auditIndexRecord.status + .equals(SPOOL_FILE_STATUS.write_inprogress)) { + currentWriterIndexRecord = auditIndexRecord; + logger.info("currentWriterIndexRecord=" + + currentWriterIndexRecord.filePath + + ", queueName=" + queueProvider.getName()); + } + if (auditIndexRecord.status + .equals(SPOOL_FILE_STATUS.read_inprogress)) { + indexQueue.add(auditIndexRecord); + } + } + printIndex(); + // One more loop to add the rest of the pending records in reverse + // order + for (int i = 0; i < indexRecords.size(); i++) { + AuditIndexRecord auditIndexRecord = indexRecords.get(i); + if (auditIndexRecord.status.equals(SPOOL_FILE_STATUS.pending)) { + File consumerFile = new File(auditIndexRecord.filePath); + if (!consumerFile.exists()) { + logger.error("INIT: Consumer file=" + + consumerFile.getPath() + " not found."); + System.exit(1); + } + indexQueue.add(auditIndexRecord); + } + } + + } catch (Throwable t) { + logger.fatal("Error initializing File Spooler. queue=" + + queueProvider.getName(), t); + return; + } + initDone = true; + } + + /** + * Start looking for outstanding logs and update status according. + */ + public void start() { + if (!initDone) { + logger.error("Cannot start Audit File Spooler. Initilization not done yet. queueName=" + + queueProvider.getName()); + return; + } + + logger.info("Starting writerThread, queueName=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + + // Let's start the thread to read + destinationThread = new Thread(this, queueProvider.getName() + + "_destWriter"); + destinationThread.setDaemon(true); + destinationThread.start(); + } + + public void stop() { + if (!initDone) { + logger.error("Cannot stop Audit File Spooler. Initilization not done. queueName=" + + queueProvider.getName()); + return; + } + logger.info("Stop called, queueName=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + + isDrain = true; + flush(); + + PrintWriter out = getOpenLogFileStream(); + if (out != null) { + // If write is still going on, then let's give it enough time to + // complete + for (int i = 0; i < 3; i++) { + if (isWriting) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + // ignore + } + continue; + } + try { + logger.info("Closing open file, queueName=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + + out.flush(); + out.close(); + } catch (Throwable t) { + logger.debug("Error closing spool out file.", t); + } + } + } + try { + destinationThread.interrupt(); + } catch (Throwable e) { + // ignore + } + } + + public void flush() { + if (!initDone) { + logger.error("Cannot flush Audit File Spooler. Initilization not done. queueName=" + + queueProvider.getName()); + return; + } + PrintWriter out = getOpenLogFileStream(); + if (out != null) { + out.flush(); + } + } + + /** + * If any files are still not processed. Also, if the destination is not + * reachable + * + * @return + */ + public boolean isPending() { + if (!initDone) { + logError("isPending(): File Spooler not initialized. queueName=" + + queueProvider.getName()); + return false; + } + + return isPending; + } + + /** + * Milliseconds from last attempt time + * + * @return + */ + public long getLastAttemptTimeDelta() { + if (lastAttemptTime == 0) { + return 0; + } + return System.currentTimeMillis() - lastAttemptTime; + } + + synchronized public void stashLogs(AuditEventBase event) { + if (isDrain) { + // Stop has been called, so this method shouldn't be called + logger.error("stashLogs() is called after stop is called. event=" + + event); + return; + } + try { + isWriting = true; + PrintWriter logOut = getLogFileStream(); + // Convert event to json + String jsonStr = MiscUtil.stringify(event); + logOut.println(jsonStr); + isPending = true; + } catch (Exception ex) { + logger.error("Error writing to file. event=" + event, ex); + } finally { + isWriting = false; + } + + } + + synchronized public void stashLogs(Collection<AuditEventBase> events) { + for (AuditEventBase event : events) { + stashLogs(event); + } + flush(); + } + + synchronized public void stashLogsString(String event) { + if (isDrain) { + // Stop has been called, so this method shouldn't be called + logger.error("stashLogs() is called after stop is called. event=" + + event); + return; + } + try { + isWriting = true; + PrintWriter logOut = getLogFileStream(); + logOut.println(event); + } catch (Exception ex) { + logger.error("Error writing to file. event=" + event, ex); + } finally { + isWriting = false; + } + + } + + synchronized public void stashLogsString(Collection<String> events) { + for (String event : events) { + stashLogsString(event); + } + flush(); + } + + /** + * This return the current file. If there are not current open output file, + * then it will return null + * + * @return + * @throws Exception + */ + synchronized private PrintWriter getOpenLogFileStream() { + return logWriter; + } + + /** + * @return + * @throws Exception + */ + synchronized private PrintWriter getLogFileStream() throws Exception { + closeFileIfNeeded(); + + // Either there are no open log file or the previous one has been rolled + // over + if (currentWriterIndexRecord == null) { + Date currentTime = new Date(); + // Create a new file + String fileName = MiscUtil.replaceTokens(logFileNameFormat, + currentTime.getTime()); + String newFileName = fileName; + File outLogFile = null; + int i = 0; + while (true) { + outLogFile = new File(logFolder, newFileName); + File archiveLogFile = new File(archiveFolder, newFileName); + if (!outLogFile.exists() && !archiveLogFile.exists()) { + break; + } + i++; + int lastDot = fileName.lastIndexOf('.'); + String baseName = fileName.substring(0, lastDot); + String extension = fileName.substring(lastDot); + newFileName = baseName + "." + i + extension; + } + fileName = newFileName; + logger.info("Creating new file. queueName=" + + queueProvider.getName() + ", fileName=" + fileName); + // Open the file + logWriter = new PrintWriter(new BufferedWriter(new FileWriter( + outLogFile))); + + AuditIndexRecord tmpIndexRecord = new AuditIndexRecord(); + + tmpIndexRecord.id = MiscUtil.generateUniqueId(); + tmpIndexRecord.filePath = outLogFile.getPath(); + tmpIndexRecord.status = SPOOL_FILE_STATUS.write_inprogress; + tmpIndexRecord.fileCreateTime = currentTime; + tmpIndexRecord.lastAttempt = true; + currentWriterIndexRecord = tmpIndexRecord; + indexRecords.add(currentWriterIndexRecord); + saveIndexFile(); + + } else { + if (logWriter == null) { + // This means the process just started. We need to open the file + // in append mode. + logger.info("Opening existing file for append. queueName=" + + queueProvider.getName() + ", fileName=" + + currentWriterIndexRecord.filePath); + logWriter = new PrintWriter(new BufferedWriter(new FileWriter( + currentWriterIndexRecord.filePath, true))); + } + } + return logWriter; + } + + synchronized private void closeFileIfNeeded() throws FileNotFoundException, + IOException { + // Is there file open to write or there are no pending file, then close + // the active file + if (currentWriterIndexRecord != null) { + // Check whether the file needs to rolled + boolean closeFile = false; + if (indexRecords.size() == 1) { + closeFile = true; + logger.info("Closing file. Only one open file. queueName=" + + queueProvider.getName() + ", fileName=" + + currentWriterIndexRecord.filePath); + } else if (System.currentTimeMillis() + - currentWriterIndexRecord.fileCreateTime.getTime() > fileRolloverSec * 1000) { + closeFile = true; + logger.info("Closing file. Rolling over. queueName=" + + queueProvider.getName() + ", fileName=" + + currentWriterIndexRecord.filePath); + } + if (closeFile) { + // Roll the file + if (logWriter != null) { + logWriter.flush(); + logWriter.close(); + logWriter = null; + } + currentWriterIndexRecord.status = SPOOL_FILE_STATUS.pending; + currentWriterIndexRecord.writeCompleteTime = new Date(); + saveIndexFile(); + logger.info("Adding file to queue. queueName=" + + queueProvider.getName() + ", fileName=" + + currentWriterIndexRecord.filePath); + indexQueue.add(currentWriterIndexRecord); + currentWriterIndexRecord = null; + } + } + } + + /** + * Load the index file + * + * @throws IOException + */ + void loadIndexFile() throws IOException { + logger.info("Loading index file. fileName=" + indexFile.getPath()); + BufferedReader br = new BufferedReader(new FileReader(indexFile)); + indexRecords.clear(); + String line; + while ((line = br.readLine()) != null) { + if (!line.isEmpty() && !line.startsWith("#")) { + AuditIndexRecord record = gson.fromJson(line, + AuditIndexRecord.class); + indexRecords.add(record); + } + } + br.close(); + } + + synchronized void printIndex() { + logger.info("INDEX printIndex() ==== START"); + Iterator<AuditIndexRecord> iter = indexRecords.iterator(); + while (iter.hasNext()) { + AuditIndexRecord record = iter.next(); + logger.info("INDEX=" + record + ", isFileExist=" + + (new File(record.filePath).exists())); + } + logger.info("INDEX printIndex() ==== END"); + } + + synchronized void removeIndexRecord(AuditIndexRecord indexRecord) + throws FileNotFoundException, IOException { + Iterator<AuditIndexRecord> iter = indexRecords.iterator(); + while (iter.hasNext()) { + AuditIndexRecord record = iter.next(); + if (record.id.equals(indexRecord.id)) { + logger.info("Removing file from index. file=" + record.filePath + + ", queueName=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + + iter.remove(); + appendToDoneFile(record); + } + } + saveIndexFile(); + } + + synchronized void saveIndexFile() throws FileNotFoundException, IOException { + PrintWriter out = new PrintWriter(indexFile); + for (AuditIndexRecord auditIndexRecord : indexRecords) { + out.println(gson.toJson(auditIndexRecord)); + } + out.close(); + // printIndex(); + + } + + void appendToDoneFile(AuditIndexRecord indexRecord) + throws FileNotFoundException, IOException { + logger.info("Moving to done file. " + indexRecord.filePath + + ", queueName=" + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + String line = gson.toJson(indexRecord); + PrintWriter out = new PrintWriter(new BufferedWriter(new FileWriter( + indexDoneFile, true))); + out.println(line); + out.flush(); + out.close(); + + // Move to archive folder + File logFile = null; + File archiveFile = null; + try { + logFile = new File(indexRecord.filePath); + String fileName = logFile.getName(); + archiveFile = new File(archiveFolder, fileName); + logger.info("Moving logFile " + logFile + " to " + archiveFile); + logFile.renameTo(archiveFile); + } catch (Throwable t) { + logger.error("Error moving log file to archive folder. logFile=" + + logFile + ", archiveFile=" + archiveFile, t); + } + + archiveFile = null; + try { + // Remove old files + File[] logFiles = archiveFolder.listFiles(new FileFilter() { + public boolean accept(File pathname) { + return pathname.getName().toLowerCase().endsWith(".log"); + } + }); + + if (logFiles.length > maxArchiveFiles) { + int filesToDelete = logFiles.length - maxArchiveFiles; + BufferedReader br = new BufferedReader(new FileReader( + indexDoneFile)); + try { + int filesDeletedCount = 0; + while ((line = br.readLine()) != null) { + if (!line.isEmpty() && !line.startsWith("#")) { + AuditIndexRecord record = gson.fromJson(line, + AuditIndexRecord.class); + logFile = new File(record.filePath); + String fileName = logFile.getName(); + archiveFile = new File(archiveFolder, fileName); + if (archiveFile.exists()) { + logger.info("Deleting archive file " + + archiveFile); + boolean ret = archiveFile.delete(); + if (!ret) { + logger.error("Error deleting archive file. archiveFile=" + + archiveFile); + } + filesDeletedCount++; + if (filesDeletedCount >= filesToDelete) { + logger.info("Deleted " + filesDeletedCount + + " files"); + break; + } + } + } + } + } finally { + br.close(); + } + } + } catch (Throwable t) { + logger.error("Error deleting older archive file. archiveFile=" + + archiveFile, t); + } + + } + + void logError(String msg) { + long currTimeMS = System.currentTimeMillis(); + if (currTimeMS - lastErrorLogMS > errorLogIntervalMS) { + logger.error(msg); + lastErrorLogMS = currTimeMS; + } + } + + class AuditIndexRecord { + String id; + String filePath; + int linePosition = 0; + SPOOL_FILE_STATUS status = SPOOL_FILE_STATUS.write_inprogress; + Date fileCreateTime; + Date writeCompleteTime; + Date doneCompleteTime; + Date lastSuccessTime; + Date lastFailedTime; + int failedAttemptCount = 0; + boolean lastAttempt = false; + + @Override + public String toString() { + return "AuditIndexRecord [id=" + id + ", filePath=" + filePath + + ", linePosition=" + linePosition + ", status=" + status + + ", fileCreateTime=" + fileCreateTime + + ", writeCompleteTime=" + writeCompleteTime + + ", doneCompleteTime=" + doneCompleteTime + + ", lastSuccessTime=" + lastSuccessTime + + ", lastFailedTime=" + lastFailedTime + + ", failedAttemptCount=" + failedAttemptCount + + ", lastAttempt=" + lastAttempt + "]"; + } + + } + + class AuditFileSpoolAttempt { + Date attemptTime; + String status; + } + + /* + * (non-Javadoc) + * + * @see java.lang.Runnable#run() + */ + @Override + public void run() { + while (true) { + try { + // Let's pause between each iteration + if (currentConsumerIndexRecord == null) { + currentConsumerIndexRecord = indexQueue.poll( + retryDestinationMS, TimeUnit.MILLISECONDS); + } else { + Thread.sleep(retryDestinationMS); + } + + if (isDrain) { + // Need to exit + break; + } + if (currentConsumerIndexRecord == null) { + closeFileIfNeeded(); + continue; + } + + boolean isRemoveIndex = false; + File consumerFile = new File( + currentConsumerIndexRecord.filePath); + if (!consumerFile.exists()) { + logger.error("Consumer file=" + consumerFile.getPath() + + " not found."); + printIndex(); + isRemoveIndex = true; + } else { + // Let's open the file to write + BufferedReader br = new BufferedReader(new FileReader( + currentConsumerIndexRecord.filePath)); + try { + int startLine = currentConsumerIndexRecord.linePosition; + String line; + int currLine = 0; + boolean isResumed = false; + List<String> lines = new ArrayList<String>(); + while ((line = br.readLine()) != null) { + currLine++; + if (currLine < startLine) { + continue; + } + lines.add(line); + if (lines.size() == queueProvider.getMaxBatchSize()) { + boolean ret = sendEvent(lines, + currentConsumerIndexRecord, currLine); + if (!ret) { + throw new Exception("Destination down"); + } else { + if (!isResumed) { + logger.info("Started writing to destination. file=" + + currentConsumerIndexRecord.filePath + + ", queueName=" + + queueProvider.getName() + + ", consumer=" + + consumerProvider.getName()); + } + } + lines.clear(); + } + } + if (lines.size() > 0) { + boolean ret = sendEvent(lines, + currentConsumerIndexRecord, currLine); + if (!ret) { + throw new Exception("Destination down"); + } else { + if (!isResumed) { + logger.info("Started writing to destination. file=" + + currentConsumerIndexRecord.filePath + + ", queueName=" + + queueProvider.getName() + + ", consumer=" + + consumerProvider.getName()); + } + } + lines.clear(); + } + logger.info("Done reading file. file=" + + currentConsumerIndexRecord.filePath + + ", queueName=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + // The entire file is read + currentConsumerIndexRecord.status = SPOOL_FILE_STATUS.done; + currentConsumerIndexRecord.doneCompleteTime = new Date(); + currentConsumerIndexRecord.lastAttempt = true; + + isRemoveIndex = true; + } catch (Exception ex) { + isDestDown = true; + logError("Destination down. queueName=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + lastAttemptTime = System.currentTimeMillis(); + // Update the index file + currentConsumerIndexRecord.lastFailedTime = new Date(); + currentConsumerIndexRecord.failedAttemptCount++; + currentConsumerIndexRecord.lastAttempt = false; + saveIndexFile(); + } finally { + br.close(); + } + } + if (isRemoveIndex) { + // Remove this entry from index + removeIndexRecord(currentConsumerIndexRecord); + currentConsumerIndexRecord = null; + closeFileIfNeeded(); + } + } catch (Throwable t) { + logger.error("Exception in destination writing thread.", t); + } + } + logger.info("Exiting file spooler. provider=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + } + + private boolean sendEvent(List<String> lines, AuditIndexRecord indexRecord, + int currLine) { + boolean ret = true; + try { + ret = consumerProvider.logJSON(lines); + if (!ret) { + // Need to log error after fixed interval + logError("Error sending logs to consumer. provider=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName()); + } else { + // Update index and save + indexRecord.linePosition = currLine; + indexRecord.status = SPOOL_FILE_STATUS.read_inprogress; + indexRecord.lastSuccessTime = new Date(); + indexRecord.lastAttempt = true; + saveIndexFile(); + + if (isDestDown) { + isDestDown = false; + logger.info("Destination up now. " + indexRecord.filePath + + ", queueName=" + queueProvider.getName() + + ", consumer=" + consumerProvider.getName()); + } + } + } catch (Throwable t) { + logger.error("Error while sending logs to consumer. provider=" + + queueProvider.getName() + ", consumer=" + + consumerProvider.getName() + ", log=" + lines, t); + } + + return ret; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java new file mode 100644 index 0000000..3ef3e30 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditMessageException.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ranger.audit.provider; + +/** + * This exception should be thrown only when there is an error in the message + * itself. E.g. invalid field type, etc. Don't throw this exception if there is + * a transient error + */ +public class AuditMessageException extends Exception { + + private static final long serialVersionUID = 1L; + + public AuditMessageException() { + } + + /** + * @param message + */ + public AuditMessageException(String message) { + super(message); + } + + /** + * @param cause + */ + public AuditMessageException(Throwable cause) { + super(cause); + } + + /** + * @param message + * @param cause + */ + public AuditMessageException(String message, Throwable cause) { + super(message, cause); + } + + /** + * @param message + * @param cause + * @param enableSuppression + * @param writableStackTrace + */ + public AuditMessageException(String message, Throwable cause, + boolean enableSuppression, boolean writableStackTrace) { + super(message, cause, enableSuppression, writableStackTrace); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java index 47c2d7f..0e38624 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProvider.java @@ -18,18 +18,38 @@ package org.apache.ranger.audit.provider; +import java.util.Collection; import java.util.Properties; import org.apache.ranger.audit.model.AuditEventBase; public interface AuditProvider { - public void log(AuditEventBase event); + public boolean log(AuditEventBase event); + public boolean log(Collection<AuditEventBase> events); + + public boolean logJSON(String event); + public boolean logJSON(Collection<String> events); public void init(Properties prop); + public void init(Properties prop, String basePropertyName); public void start(); public void stop(); public void waitToComplete(); + public void waitToComplete(long timeout); + + /** + * Name for this provider. Used only during logging. Uniqueness is not guaranteed + */ + public String getName(); + /** + * If this AuditProvider in the state of shutdown + * @return + */ + public boolean isDrain(); + + public int getMaxBatchSize(); + public int getMaxBatchInterval(); public boolean isFlushPending(); public long getLastFlushTime(); public void flush(); http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/eb1e9b5c/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java index bb8fa6d..13b3142 100644 --- a/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/AuditProviderFactory.java @@ -28,7 +28,6 @@ import org.apache.ranger.audit.provider.hdfs.HdfsAuditProvider; import org.apache.ranger.audit.provider.kafka.KafkaAuditProvider; import org.apache.ranger.audit.provider.solr.SolrAuditProvider; - /* * TODO: * 1) Flag to enable/disable audit logging @@ -37,22 +36,25 @@ import org.apache.ranger.audit.provider.solr.SolrAuditProvider; */ public class AuditProviderFactory { - private static final Log LOG = LogFactory.getLog(AuditProviderFactory.class); - - private static final String AUDIT_IS_ENABLED_PROP = "xasecure.audit.is.enabled" ; - private static final String AUDIT_DB_IS_ENABLED_PROP = "xasecure.audit.db.is.enabled" ; - private static final String AUDIT_HDFS_IS_ENABLED_PROP = "xasecure.audit.hdfs.is.enabled"; - private static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled" ; - private static final String AUDIT_KAFKA_IS_ENABLED_PROP = "xasecure.audit.kafka.is.enabled"; - private static final String AUDIT_SOLR_IS_ENABLED_PROP = "xasecure.audit.solr.is.enabled"; - - private static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024; - private static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000; - + private static final Log LOG = LogFactory + .getLog(AuditProviderFactory.class); + + public static final String AUDIT_IS_ENABLED_PROP = "xasecure.audit.is.enabled"; + public static final String AUDIT_DB_IS_ENABLED_PROP = "xasecure.audit.db.is.enabled"; + public static final String AUDIT_HDFS_IS_ENABLED_PROP = "xasecure.audit.hdfs.is.enabled"; + public static final String AUDIT_LOG4J_IS_ENABLED_PROP = "xasecure.audit.log4j.is.enabled"; + public static final String AUDIT_KAFKA_IS_ENABLED_PROP = "xasecure.audit.kafka.is.enabled"; + public static final String AUDIT_SOLR_IS_ENABLED_PROP = "xasecure.audit.solr.is.enabled"; + + public static final String AUDIT_DEST_BASE = "xasecure.audit.destination"; + + public static final int AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT = 10 * 1024; + public static final int AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT = 5 * 1000; + private static AuditProviderFactory sFactory; private AuditProvider mProvider = null; - private boolean mInitDone = false; + private boolean mInitDone = false; private AuditProviderFactory() { LOG.info("AuditProviderFactory: creating.."); @@ -61,9 +63,9 @@ public class AuditProviderFactory { } public static AuditProviderFactory getInstance() { - if(sFactory == null) { - synchronized(AuditProviderFactory.class) { - if(sFactory == null) { + if (sFactory == null) { + synchronized (AuditProviderFactory.class) { + if (sFactory == null) { sFactory = new AuditProviderFactory(); } } @@ -75,7 +77,7 @@ public class AuditProviderFactory { public static AuditProvider getAuditProvider() { return AuditProviderFactory.getInstance().getProvider(); } - + public AuditProvider getProvider() { return mProvider; } @@ -86,133 +88,301 @@ public class AuditProviderFactory { public synchronized void init(Properties props, String appType) { LOG.info("AuditProviderFactory: initializing.."); - - if(mInitDone) { - LOG.warn("AuditProviderFactory.init(): already initialized!", new Exception()); + + if (mInitDone) { + LOG.warn("AuditProviderFactory.init(): already initialized!", + new Exception()); return; } mInitDone = true; - - MiscUtil.setApplicationType(appType); - boolean isEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_IS_ENABLED_PROP, false); - boolean isAuditToDbEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ENABLED_PROP, false); - boolean isAuditToHdfsEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_HDFS_IS_ENABLED_PROP, false); - boolean isAuditToLog4jEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_LOG4J_IS_ENABLED_PROP, false); - boolean isAuditToKafkaEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_KAFKA_IS_ENABLED_PROP, false); - boolean isAuditToSolrEnabled = BaseAuditProvider.getBooleanProperty(props, AUDIT_SOLR_IS_ENABLED_PROP, false); + MiscUtil.setApplicationType(appType); - if(!isEnabled || !(isAuditToDbEnabled || isAuditToHdfsEnabled || isAuditToKafkaEnabled || isAuditToLog4jEnabled || isAuditToSolrEnabled)) { - LOG.info("AuditProviderFactory: Audit not enabled.."); + boolean isEnabled = MiscUtil.getBooleanProperty(props, + AUDIT_IS_ENABLED_PROP, false); + boolean isAuditToDbEnabled = MiscUtil.getBooleanProperty(props, + AUDIT_DB_IS_ENABLED_PROP, false); + boolean isAuditToHdfsEnabled = MiscUtil.getBooleanProperty(props, + AUDIT_HDFS_IS_ENABLED_PROP, false); + boolean isAuditToLog4jEnabled = MiscUtil.getBooleanProperty(props, + AUDIT_LOG4J_IS_ENABLED_PROP, false); + boolean isAuditToKafkaEnabled = MiscUtil.getBooleanProperty(props, + AUDIT_KAFKA_IS_ENABLED_PROP, false); + boolean isAuditToSolrEnabled = MiscUtil.getBooleanProperty(props, + AUDIT_SOLR_IS_ENABLED_PROP, false); - mProvider = getDefaultProvider(); + List<AuditProvider> providers = new ArrayList<AuditProvider>(); - return; + // TODO: Delete me + for (Object propNameObj : props.keySet()) { + LOG.info("DELETE ME: " + propNameObj.toString() + "=" + + props.getProperty(propNameObj.toString())); } - List<AuditProvider> providers = new ArrayList<AuditProvider>(); + // Process new audit configurations + List<String> destNameList = new ArrayList<String>(); - if(isAuditToDbEnabled) { - LOG.info("DbAuditProvider is enabled"); - DbAuditProvider dbProvider = new DbAuditProvider(); - - boolean isAuditToDbAsync = BaseAuditProvider.getBooleanProperty(props, DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP, false); + for (Object propNameObj : props.keySet()) { + String propName = propNameObj.toString(); + if (propName.length() <= AUDIT_DEST_BASE.length() + 1) { + continue; + } + String destName = propName.substring(AUDIT_DEST_BASE.length() + 1); + List<String> splits = MiscUtil.toArray(destName, "."); + if (splits.size() > 1) { + continue; + } + String value = props.getProperty(propName); + if (value.equalsIgnoreCase("enable") + || value.equalsIgnoreCase("true")) { + destNameList.add(destName); + LOG.info("Audit destination " + propName + " is set to " + + value); + } + } - if(isAuditToDbAsync) { - int maxQueueSize = BaseAuditProvider.getIntProperty(props, DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT); - int maxFlushInterval = BaseAuditProvider.getIntProperty(props, DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT); + for (String destName : destNameList) { + String destPropPrefix = AUDIT_DEST_BASE + "." + destName; + AuditProvider destProvider = getProviderFromConfig(props, + destPropPrefix, destName); - AsyncAuditProvider asyncProvider = new AsyncAuditProvider("DbAuditProvider", maxQueueSize, maxFlushInterval, dbProvider); + if (destProvider != null) { + destProvider.init(props, destPropPrefix); - providers.add(asyncProvider); - } else { - providers.add(dbProvider); + String queueName = MiscUtil.getStringProperty(props, + destPropPrefix + "." + BaseAuditProvider.PROP_QUEUE); + if( queueName == null || queueName.isEmpty()) { + queueName = "batch"; + } + if (queueName != null && !queueName.isEmpty() + && !queueName.equalsIgnoreCase("none")) { + String queuePropPrefix = destPropPrefix + "." + queueName; + AuditProvider queueProvider = getProviderFromConfig(props, + queuePropPrefix, queueName); + if (queueProvider != null) { + if (queueProvider instanceof BaseAuditProvider) { + BaseAuditProvider qProvider = (BaseAuditProvider) queueProvider; + qProvider.setConsumer(destProvider); + qProvider.init(props, queuePropPrefix); + providers.add(queueProvider); + } else { + LOG.fatal("Provider queue doesn't extend BaseAuditProvider destination " + + destName + + " can't be created. queueName=" + + queueName); + } + } else { + LOG.fatal("Queue provider for destination " + destName + + " can't be created. queueName=" + queueName); + } + } else { + LOG.info("Audit destination " + destProvider.getName() + + " added to provider list"); + providers.add(destProvider); + } } } + if (providers.size() > 0) { + LOG.info("Using v2 audit configuration"); + AuditAsyncQueue asyncQueue = new AuditAsyncQueue(); + String propPrefix = BaseAuditProvider.PROP_DEFAULT_PREFIX + "." + "async"; + asyncQueue.init(props, propPrefix); + + if (providers.size() == 1) { + asyncQueue.setConsumer(providers.get(0)); + } else { + MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider(); + multiDestProvider.init(props); + multiDestProvider.addAuditProviders(providers); + asyncQueue.setConsumer(multiDestProvider); + } - if(isAuditToHdfsEnabled) { - LOG.info("HdfsAuditProvider is enabled"); + mProvider = asyncQueue; + mProvider.start(); + } else { + LOG.info("No v2 audit configuration found. Trying v1 audit configurations"); + if (!isEnabled + || !(isAuditToDbEnabled || isAuditToHdfsEnabled + || isAuditToKafkaEnabled || isAuditToLog4jEnabled + || isAuditToSolrEnabled || providers.size() == 0)) { + LOG.info("AuditProviderFactory: Audit not enabled.."); - HdfsAuditProvider hdfsProvider = new HdfsAuditProvider(); + mProvider = getDefaultProvider(); - boolean isAuditToHdfsAsync = BaseAuditProvider.getBooleanProperty(props, HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false); + return; + } - if(isAuditToHdfsAsync) { - int maxQueueSize = BaseAuditProvider.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT); - int maxFlushInterval = BaseAuditProvider.getIntProperty(props, HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT); + if (isAuditToDbEnabled) { + LOG.info("DbAuditProvider is enabled"); + DbAuditProvider dbProvider = new DbAuditProvider(); - AsyncAuditProvider asyncProvider = new AsyncAuditProvider("HdfsAuditProvider", maxQueueSize, maxFlushInterval, hdfsProvider); + boolean isAuditToDbAsync = MiscUtil.getBooleanProperty(props, + DbAuditProvider.AUDIT_DB_IS_ASYNC_PROP, false); - providers.add(asyncProvider); - } else { - providers.add(hdfsProvider); - } - } + if (isAuditToDbAsync) { + int maxQueueSize = MiscUtil.getIntProperty(props, + DbAuditProvider.AUDIT_DB_MAX_QUEUE_SIZE_PROP, + AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT); + int maxFlushInterval = MiscUtil.getIntProperty(props, + DbAuditProvider.AUDIT_DB_MAX_FLUSH_INTERVAL_PROP, + AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT); - if(isAuditToKafkaEnabled) { - LOG.info("KafkaAuditProvider is enabled"); - KafkaAuditProvider kafkaProvider = new KafkaAuditProvider(); - kafkaProvider.init(props); - - if( kafkaProvider.isAsync()) { - AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MyKafkaAuditProvider", kafkaProvider.getMaxQueueSize(), kafkaProvider.getMaxFlushInterval(), kafkaProvider); - providers.add(asyncProvider); - } else { - providers.add(kafkaProvider); - } - } + AsyncAuditProvider asyncProvider = new AsyncAuditProvider( + "DbAuditProvider", maxQueueSize, maxFlushInterval, + dbProvider); - if(isAuditToSolrEnabled) { - LOG.info("SolrAuditProvider is enabled"); - SolrAuditProvider solrProvider = new SolrAuditProvider(); - solrProvider.init(props); - - if( solrProvider.isAsync()) { - AsyncAuditProvider asyncProvider = new AsyncAuditProvider("MySolrAuditProvider", solrProvider.getMaxQueueSize(), solrProvider.getMaxFlushInterval(), solrProvider); - providers.add(asyncProvider); - } else { - providers.add(solrProvider); + providers.add(asyncProvider); + } else { + providers.add(dbProvider); + } } - } - if(isAuditToLog4jEnabled) { - Log4jAuditProvider log4jProvider = new Log4jAuditProvider(); + if (isAuditToHdfsEnabled) { + LOG.info("HdfsAuditProvider is enabled"); - boolean isAuditToLog4jAsync = BaseAuditProvider.getBooleanProperty(props, Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP, false); + HdfsAuditProvider hdfsProvider = new HdfsAuditProvider(); - if(isAuditToLog4jAsync) { - int maxQueueSize = BaseAuditProvider.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT); - int maxFlushInterval = BaseAuditProvider.getIntProperty(props, Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT); + boolean isAuditToHdfsAsync = MiscUtil.getBooleanProperty(props, + HdfsAuditProvider.AUDIT_HDFS_IS_ASYNC_PROP, false); - AsyncAuditProvider asyncProvider = new AsyncAuditProvider("Log4jAuditProvider", maxQueueSize, maxFlushInterval, log4jProvider); + if (isAuditToHdfsAsync) { + int maxQueueSize = MiscUtil.getIntProperty(props, + HdfsAuditProvider.AUDIT_HDFS_MAX_QUEUE_SIZE_PROP, + AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT); + int maxFlushInterval = MiscUtil + .getIntProperty( + props, + HdfsAuditProvider.AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP, + AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT); + + AsyncAuditProvider asyncProvider = new AsyncAuditProvider( + "HdfsAuditProvider", maxQueueSize, + maxFlushInterval, hdfsProvider); + + providers.add(asyncProvider); + } else { + providers.add(hdfsProvider); + } + } - providers.add(asyncProvider); + if (isAuditToKafkaEnabled) { + LOG.info("KafkaAuditProvider is enabled"); + KafkaAuditProvider kafkaProvider = new KafkaAuditProvider(); + kafkaProvider.init(props); + + if (kafkaProvider.isAsync()) { + AsyncAuditProvider asyncProvider = new AsyncAuditProvider( + "MyKafkaAuditProvider", + kafkaProvider.getMaxQueueSize(), + kafkaProvider.getMaxBatchInterval(), kafkaProvider); + providers.add(asyncProvider); + } else { + providers.add(kafkaProvider); + } + } + + if (isAuditToSolrEnabled) { + LOG.info("SolrAuditProvider is enabled"); + SolrAuditProvider solrProvider = new SolrAuditProvider(); + solrProvider.init(props); + + if (solrProvider.isAsync()) { + AsyncAuditProvider asyncProvider = new AsyncAuditProvider( + "MySolrAuditProvider", + solrProvider.getMaxQueueSize(), + solrProvider.getMaxBatchInterval(), solrProvider); + providers.add(asyncProvider); + } else { + providers.add(solrProvider); + } + } + + if (isAuditToLog4jEnabled) { + Log4jAuditProvider log4jProvider = new Log4jAuditProvider(); + + boolean isAuditToLog4jAsync = MiscUtil.getBooleanProperty( + props, Log4jAuditProvider.AUDIT_LOG4J_IS_ASYNC_PROP, + false); + + if (isAuditToLog4jAsync) { + int maxQueueSize = MiscUtil.getIntProperty(props, + Log4jAuditProvider.AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP, + AUDIT_ASYNC_MAX_QUEUE_SIZE_DEFAULT); + int maxFlushInterval = MiscUtil + .getIntProperty( + props, + Log4jAuditProvider.AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP, + AUDIT_ASYNC_MAX_FLUSH_INTERVAL_DEFAULT); + + AsyncAuditProvider asyncProvider = new AsyncAuditProvider( + "Log4jAuditProvider", maxQueueSize, + maxFlushInterval, log4jProvider); + + providers.add(asyncProvider); + } else { + providers.add(log4jProvider); + } + } + if (providers.size() == 0) { + mProvider = getDefaultProvider(); + } else if (providers.size() == 1) { + mProvider = providers.get(0); } else { - providers.add(log4jProvider); + MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider(); + + multiDestProvider.addAuditProviders(providers); + + mProvider = multiDestProvider; } - } - if(providers.size() == 0) { - mProvider = getDefaultProvider(); - } else if(providers.size() == 1) { - mProvider = providers.get(0); - } else { - MultiDestAuditProvider multiDestProvider = new MultiDestAuditProvider(); - - multiDestProvider.addAuditProviders(providers); - - mProvider = multiDestProvider; + mProvider.init(props); + mProvider.start(); } - - mProvider.init(props); - mProvider.start(); JVMShutdownHook jvmShutdownHook = new JVMShutdownHook(mProvider); - Runtime.getRuntime().addShutdownHook(jvmShutdownHook); + Runtime.getRuntime().addShutdownHook(jvmShutdownHook); + } + + private AuditProvider getProviderFromConfig(Properties props, + String propPrefix, String providerName) { + AuditProvider provider = null; + String className = MiscUtil.getStringProperty(props, propPrefix + "." + + BaseAuditProvider.PROP_CLASS_NAME); + if (className != null && !className.isEmpty()) { + try { + provider = (AuditProvider) Class.forName(className) + .newInstance(); + } catch (Exception e) { + LOG.fatal("Can't instantiate audit class for providerName=" + + providerName + ", className=" + className); + } + } else { + if (providerName.equals("file")) { + provider = new FileAuditDestination(); + } else if (providerName.equalsIgnoreCase("hdfs")) { + provider = new HDFSAuditDestination(); + } else if (providerName.equals("solr")) { + provider = new SolrAuditProvider(); + } else if (providerName.equals("kafka")) { + provider = new KafkaAuditProvider(); + } else if (providerName.equals("db")) { + provider = new DbAuditProvider(); + } else if (providerName.equals("log4j")) { + provider = new Log4jAuditProvider(); + } else if (providerName.equals("batch")) { + provider = new AuditBatchProcessor(); + } else if (providerName.equals("async")) { + provider = new AuditAsyncQueue(); + } else { + LOG.error("Provider name doesn't have any class associated with it. providerName=" + + providerName); + } + } + return provider; } - + private AuditProvider getDefaultProvider() { return new DummyAuditProvider(); } @@ -227,6 +397,6 @@ public class AuditProviderFactory { public void run() { mProvider.waitToComplete(); mProvider.stop(); - } + } } }