http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java new file mode 100644 index 0000000..7414a7a --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DbAuditProvider.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.audit.provider; + +import java.util.ArrayList; +import java.util.Map; +import java.util.Properties; + +import javax.persistence.EntityManager; +import javax.persistence.EntityManagerFactory; +import javax.persistence.EntityTransaction; +import javax.persistence.Persistence; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.dao.DaoManager; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.authorization.hadoop.utils.RangerCredentialProvider; + + +/* + * NOTE: + * - Instances of this class are not thread-safe. + */ +public class DbAuditProvider extends BaseAuditProvider { + + private static final Log LOG = LogFactory.getLog(DbAuditProvider.class); + + public static final String AUDIT_DB_IS_ASYNC_PROP = "xasecure.audit.db.is.async"; + public static final String AUDIT_DB_MAX_QUEUE_SIZE_PROP = "xasecure.audit.db.async.max.queue.size" ; + public static final String AUDIT_DB_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.db.async.max.flush.interval.ms"; + + private static final String AUDIT_DB_BATCH_SIZE_PROP = "xasecure.audit.db.batch.size" ; + private static final String AUDIT_DB_RETRY_MIN_INTERVAL_PROP = "xasecure.audit.db.config.retry.min.interval.ms"; + private static final String AUDIT_JPA_CONFIG_PROP_PREFIX = "xasecure.audit.jpa."; + private static final String AUDIT_DB_CREDENTIAL_PROVIDER_FILE = "xasecure.audit.credential.provider.file"; + private static final String AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS = "auditDBCred"; + private static final String AUDIT_JPA_JDBC_PASSWORD = "javax.persistence.jdbc.password"; + + private EntityManagerFactory entityManagerFactory; + private DaoManager daoManager; + + private int mCommitBatchSize = 1; + private int mDbRetryMinIntervalMs = 60 * 1000; + private long mLastCommitTime = System.currentTimeMillis(); + private ArrayList<AuditEventBase> mUncommitted = new ArrayList<AuditEventBase>(); + private Map<String, String> mDbProperties = null; + private long mLastDbFailedTime = 0; + + public DbAuditProvider() { + LOG.info("DbAuditProvider: creating.."); + } + + @Override + public void init(Properties props) { + LOG.info("DbAuditProvider.init()"); + + super.init(props); + + mDbProperties = BaseAuditProvider.getPropertiesWithPrefix(props, AUDIT_JPA_CONFIG_PROP_PREFIX); + mCommitBatchSize = BaseAuditProvider.getIntProperty(props, AUDIT_DB_BATCH_SIZE_PROP, 1000); + mDbRetryMinIntervalMs = BaseAuditProvider.getIntProperty(props, AUDIT_DB_RETRY_MIN_INTERVAL_PROP, 15 * 1000); + + boolean isAsync = BaseAuditProvider.getBooleanProperty(props, AUDIT_DB_IS_ASYNC_PROP, false); + + if(! isAsync) { + mCommitBatchSize = 1; // Batching not supported in sync mode + } + + String jdbcPassword = getCredentialString(BaseAuditProvider.getStringProperty(props, AUDIT_DB_CREDENTIAL_PROVIDER_FILE), AUDIT_DB_CREDENTIAL_PROVIDER_ALIAS); + + if(jdbcPassword != null && !jdbcPassword.isEmpty()) { + mDbProperties.put(AUDIT_JPA_JDBC_PASSWORD, jdbcPassword); + } + } + + @Override + public void log(AuditEventBase event) { + LOG.debug("DbAuditProvider.log()"); + + boolean isSuccess = false; + + try { + if(preCreate(event)) { + DaoManager daoMgr = daoManager; + + if(daoMgr != null) { + event.persist(daoMgr); + + isSuccess = postCreate(event); + } + } + } catch(Exception excp) { + logDbError("DbAuditProvider.log(): failed", excp); + } finally { + if(! isSuccess) { + logFailedEvent(event); + } + } + } + + @Override + public void start() { + LOG.info("DbAuditProvider.start()"); + + init(); + } + + @Override + public void stop() { + LOG.info("DbAuditProvider.stop()"); + + cleanUp(); + } + + @Override + public void waitToComplete() { + LOG.info("DbAuditProvider.waitToComplete()"); + } + + @Override + public boolean isFlushPending() { + return mUncommitted.size() > 0; + } + + @Override + public long getLastFlushTime() { + return mLastCommitTime; + } + + @Override + public void flush() { + if(mUncommitted.size() > 0) { + boolean isSuccess = commitTransaction(); + + if(! isSuccess) { + for(AuditEventBase evt : mUncommitted) { + logFailedEvent(evt); + } + } + + mUncommitted.clear(); + } + } + + private synchronized boolean init() { + long now = System.currentTimeMillis(); + + if((now - mLastDbFailedTime) < mDbRetryMinIntervalMs) { + return false; + } + + LOG.info("DbAuditProvider: init()"); + + try { + entityManagerFactory = Persistence.createEntityManagerFactory("xa_server", mDbProperties); + + daoManager = new DaoManager(); + daoManager.setEntityManagerFactory(entityManagerFactory); + + daoManager.getEntityManager(); // this forces the connection to be made to DB + } catch(Exception excp) { + logDbError("DbAuditProvider: DB initalization failed", excp); + + cleanUp(); + + return false; + } + + return true; + } + + private synchronized void cleanUp() { + LOG.info("DbAuditProvider: cleanUp()"); + + try { + if(entityManagerFactory != null && entityManagerFactory.isOpen()) { + entityManagerFactory.close(); + } + } catch(Exception excp) { + LOG.error("DbAuditProvider.cleanUp(): failed", excp); + } finally { + entityManagerFactory = null; + daoManager = null; + } + } + + private boolean isDbConnected() { + EntityManager em = getEntityManager(); + + return em != null && em.isOpen(); + } + + private EntityManager getEntityManager() { + DaoManager daoMgr = daoManager; + + if(daoMgr != null) { + try { + return daoMgr.getEntityManager(); + } catch(Exception excp) { + logDbError("DbAuditProvider.getEntityManager(): failed", excp); + + cleanUp(); + } + } + + return null; + } + + private void clearEntityManager() { + try { + EntityManager em = getEntityManager(); + + if(em != null) { + em.clear(); + } + } catch(Exception excp) { + LOG.warn("DbAuditProvider.clearEntityManager(): failed", excp); + } + } + + private EntityTransaction getTransaction() { + EntityManager em = getEntityManager(); + + return em != null ? em.getTransaction() : null; + } + + private boolean isInTransaction() { + EntityTransaction trx = getTransaction(); + + return trx != null && trx.isActive(); + } + + private boolean beginTransaction() { + EntityTransaction trx = getTransaction(); + + if(trx != null && !trx.isActive()) { + trx.begin(); + } + + if(trx == null) { + LOG.warn("DbAuditProvider.beginTransaction(): trx is null"); + } + + return trx != null; + } + + private boolean commitTransaction() { + boolean ret = false; + EntityTransaction trx = null; + + try { + trx = getTransaction(); + + if(trx != null && trx.isActive()) { + trx.commit(); + + ret =true; + } else { + throw new Exception("trx is null or not active"); + } + } catch(Exception excp) { + logDbError("DbAuditProvider.commitTransaction(): failed", excp); + + cleanUp(); // so that next insert will try to init() + } finally { + mLastCommitTime = System.currentTimeMillis(); + + clearEntityManager(); + } + + return ret; + } + + private boolean preCreate(AuditEventBase event) { + boolean ret = true; + + if(!isDbConnected()) { + ret = init(); + } + + if(ret) { + if(! isInTransaction()) { + ret = beginTransaction(); + } + } + + return ret; + } + + private boolean postCreate(AuditEventBase event) { + boolean ret = true; + + if(mCommitBatchSize <= 1) { + ret = commitTransaction(); + } else { + mUncommitted.add(event); + + if((mUncommitted.size() % mCommitBatchSize) == 0) { + ret = commitTransaction(); + + if(! ret) { + for(AuditEventBase evt : mUncommitted) { + if(evt != event) { + logFailedEvent(evt); + } + } + } + + mUncommitted.clear(); + } + } + return ret; + } + + private void logDbError(String msg, Exception excp) { + long now = System.currentTimeMillis(); + + if((now - mLastDbFailedTime) > mDbRetryMinIntervalMs) { + mLastDbFailedTime = now; + } + + LOG.warn(msg, excp); + } + + private String getCredentialString(String url,String alias) { + String ret = null; + + if(url != null && alias != null) { + char[] cred = RangerCredentialProvider.getInstance().getCredentialString(url,alias); + + if ( cred != null ) { + ret = new String(cred); + } + } + + return ret; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java new file mode 100644 index 0000000..7396fd0 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DebugTracer.java @@ -0,0 +1,28 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ranger.audit.provider; + +public interface DebugTracer { + void debug(String msg); + void debug(String msg, Throwable excp); + void info(String msg); + void info(String msg, Throwable excp); + void warn(String msg); + void warn(String msg, Throwable excp); + void error(String msg); + void error(String msg, Throwable excp); +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java new file mode 100644 index 0000000..a6e3ef1 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/DummyAuditProvider.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ranger.audit.provider; + +import java.util.Properties; + +import org.apache.ranger.audit.model.AuditEventBase; + + +public class DummyAuditProvider implements AuditProvider { + @Override + public void init(Properties prop) { + // intentionally left empty + } + + @Override + public void log(AuditEventBase event) { + // intentionally left empty + } + + @Override + public void start() { + // intentionally left empty + } + + @Override + public void stop() { + // intentionally left empty + } + + @Override + public void waitToComplete() { + // intentionally left empty + } + + @Override + public boolean isFlushPending() { + return false; + } + + @Override + public long getLastFlushTime() { + return 0; + } + + @Override + public void flush() { + // intentionally left empty + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java new file mode 100644 index 0000000..cdc4d53 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LocalFileLogBuffer.java @@ -0,0 +1,687 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ranger.audit.provider; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileFilter; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; +import java.io.Writer; +import java.security.PrivilegedAction; +import java.util.Arrays; +import java.util.Comparator; +import java.util.TreeSet; + +import org.apache.hadoop.security.UserGroupInformation; + + +public class LocalFileLogBuffer<T> implements LogBuffer<T> { + private String mDirectory = null; + private String mFile = null; + private int mFlushIntervalSeconds = 1 * 60; + private int mFileBufferSizeBytes = 8 * 1024; + private String mEncoding = null; + private boolean mIsAppend = true; + private int mRolloverIntervalSeconds = 10 * 60; + private String mArchiveDirectory = null; + private int mArchiveFileCount = 10; + private DebugTracer mLogger = null; + + private Writer mWriter = null; + private String mBufferFilename = null; + private long mNextRolloverTime = 0; + private long mNextFlushTime = 0; + private int mFileOpenRetryIntervalInMs = 60 * 1000; + private long mNextFileOpenRetryTime = 0; + + private DestinationDispatcherThread<T> mDispatcherThread = null; + + public LocalFileLogBuffer(DebugTracer tracer) { + mLogger = tracer; + } + + public String getDirectory() { + return mDirectory; + } + + public void setDirectory(String directory) { + mDirectory = directory; + } + + public String getFile() { + return mFile; + } + + public void setFile(String file) { + mFile = file; + } + + public int getFileBufferSizeBytes() { + return mFileBufferSizeBytes; + } + + public void setFileBufferSizeBytes(int fileBufferSizeBytes) { + mFileBufferSizeBytes = fileBufferSizeBytes; + } + + public int getFlushIntervalSeconds() { + return mFlushIntervalSeconds; + } + + public void setFlushIntervalSeconds(int flushIntervalSeconds) { + mFlushIntervalSeconds = flushIntervalSeconds; + } + + public String getEncoding() { + return mEncoding; + } + + public void setEncoding(String encoding) { + mEncoding = encoding; + } + + public boolean getIsAppend() { + return mIsAppend; + } + + public void setIsAppend(boolean isAppend) { + mIsAppend = isAppend; + } + + public int getRolloverIntervalSeconds() { + return mRolloverIntervalSeconds; + } + + public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) { + mRolloverIntervalSeconds = rolloverIntervalSeconds; + } + + public String getArchiveDirectory() { + return mArchiveDirectory; + } + + public void setArchiveDirectory(String archiveDirectory) { + mArchiveDirectory = archiveDirectory; + } + + public int getArchiveFileCount() { + return mArchiveFileCount; + } + + public void setArchiveFileCount(int archiveFileCount) { + mArchiveFileCount = archiveFileCount; + } + + + @Override + public void start(LogDestination<T> destination) { + mLogger.debug("==> LocalFileLogBuffer.start()"); + + mDispatcherThread = new DestinationDispatcherThread<T>(this, destination, mLogger); + + mDispatcherThread.start(); + + mLogger.debug("<== LocalFileLogBuffer.start()"); + } + + @Override + public void stop() { + mLogger.debug("==> LocalFileLogBuffer.stop()"); + + DestinationDispatcherThread<T> dispatcherThread = mDispatcherThread; + mDispatcherThread = null; + + if(dispatcherThread != null && dispatcherThread.isAlive()) { + dispatcherThread.stopThread(); + + try { + dispatcherThread.join(); + } catch (InterruptedException e) { + mLogger.warn("LocalFileLogBuffer.stop(): failed in waiting for DispatcherThread", e); + } + } + + closeFile(); + + mLogger.debug("<== LocalFileLogBuffer.stop()"); + } + + @Override + public boolean isAvailable() { + return mWriter != null; + } + + @Override + public boolean add(T log) { + boolean ret = false; + + String msg = MiscUtil.stringify(log); + + if(msg.contains(MiscUtil.LINE_SEPARATOR)) { + msg = msg.replace(MiscUtil.LINE_SEPARATOR, MiscUtil.ESCAPE_STR + MiscUtil.LINE_SEPARATOR); + } + + synchronized(this) { + checkFileStatus(); + + Writer writer = mWriter; + + if(writer != null) { + try { + writer.write(msg + MiscUtil.LINE_SEPARATOR); + + if(mFileBufferSizeBytes == 0) { + writer.flush(); + } + + ret = true; + } catch(IOException excp) { + mLogger.warn("LocalFileLogBuffer.add(): write failed", excp); + + closeFile(); + } + } + } + + return ret; + } + + @Override + public boolean isEmpty() { + return mDispatcherThread == null || mDispatcherThread.isIdle(); + } + + private synchronized void openFile() { + mLogger.debug("==> LocalFileLogBuffer.openFile()"); + + long now = System.currentTimeMillis(); + + closeFile(); + + if(mNextFileOpenRetryTime <= now) { + try { + mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L)); + + long startTime = MiscUtil.getRolloverStartTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L)); + + mBufferFilename = MiscUtil.replaceTokens(mDirectory + File.separator + mFile, startTime); + + MiscUtil.createParents(new File(mBufferFilename)); + + FileOutputStream ostream = null; + try { + ostream = new FileOutputStream(mBufferFilename, mIsAppend); + } catch(Exception excp) { + mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file " + mBufferFilename, excp); + } + + if(ostream != null) { + mWriter = createWriter(ostream); + + if(mWriter != null) { + mLogger.debug("LocalFileLogBuffer.openFile(): opened file " + mBufferFilename); + + mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L); + } else { + mLogger.warn("LocalFileLogBuffer.openFile(): failed to open file for write " + mBufferFilename); + + mBufferFilename = null; + } + } + } finally { + if(mWriter == null) { + mNextFileOpenRetryTime = now + mFileOpenRetryIntervalInMs; + } + } + } + + mLogger.debug("<== LocalFileLogBuffer.openFile()"); + } + + private synchronized void closeFile() { + mLogger.debug("==> LocalFileLogBuffer.closeFile()"); + + Writer writer = mWriter; + + mWriter = null; + + if(writer != null) { + try { + writer.flush(); + writer.close(); + } catch(IOException excp) { + mLogger.warn("LocalFileLogBuffer: failed to close file " + mBufferFilename, excp); + } + + if(mDispatcherThread != null) { + mDispatcherThread.addLogfile(mBufferFilename); + } + } + + mLogger.debug("<== LocalFileLogBuffer.closeFile()"); + } + + private void rollover() { + mLogger.debug("==> LocalFileLogBuffer.rollover()"); + + closeFile(); + + openFile(); + + mLogger.debug("<== LocalFileLogBuffer.rollover()"); + } + + private void checkFileStatus() { + long now = System.currentTimeMillis(); + + if(now > mNextRolloverTime) { + rollover(); + } else if(mWriter == null) { + openFile(); + } else if(now > mNextFlushTime) { + try { + mNextFlushTime = now + (mFlushIntervalSeconds * 1000L); + + mWriter.flush(); + } catch (IOException excp) { + mLogger.warn("LocalFileLogBuffer: failed to flush to file " + mBufferFilename, excp); + } + } + } + + private Writer createWriter(OutputStream os ) { + Writer writer = null; + + if(os != null) { + if(mEncoding != null) { + try { + writer = new OutputStreamWriter(os, mEncoding); + } catch(UnsupportedEncodingException excp) { + mLogger.warn("LocalFileLogBuffer: failed to create output writer for file " + mBufferFilename, excp); + } + } + + if(writer == null) { + writer = new OutputStreamWriter(os); + } + + if(mFileBufferSizeBytes > 0 && writer != null) { + writer = new BufferedWriter(writer, mFileBufferSizeBytes); + } + } + + return writer; + } + + boolean isCurrentFilename(String filename) { + return mBufferFilename != null && filename != null && filename.equals(mBufferFilename); + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("LocalFileLogBuffer {"); + sb.append("Directory=").append(mDirectory).append("; "); + sb.append("File=").append(mFile).append("; "); + sb.append("RolloverIntervaSeconds=").append(mRolloverIntervalSeconds).append("; "); + sb.append("ArchiveDirectory=").append(mArchiveDirectory).append("; "); + sb.append("ArchiveFileCount=").append(mArchiveFileCount); + sb.append("}"); + + return sb.toString(); + } + +} + +class DestinationDispatcherThread<T> extends Thread { + private TreeSet<String> mCompletedLogfiles = new TreeSet<String>(); + private boolean mStopThread = false; + private LocalFileLogBuffer<T> mFileLogBuffer = null; + private LogDestination<T> mDestination = null; + private DebugTracer mLogger = null; + + private String mCurrentLogfile = null; + private BufferedReader mReader = null; + + public DestinationDispatcherThread(LocalFileLogBuffer<T> fileLogBuffer, LogDestination<T> destination, DebugTracer tracer) { + super(DestinationDispatcherThread.class.getSimpleName() + "-" + System.currentTimeMillis()); + + mLogger = tracer; + + mFileLogBuffer = fileLogBuffer; + mDestination = destination; + + setDaemon(true); + } + + public void addLogfile(String filename) { + mLogger.debug("==> DestinationDispatcherThread.addLogfile(" + filename + ")"); + + if(filename != null) { + synchronized(mCompletedLogfiles) { + mCompletedLogfiles.add(filename); + mCompletedLogfiles.notify(); + } + } + + mLogger.debug("<== DestinationDispatcherThread.addLogfile(" + filename + ")"); + } + + public void stopThread() { + mStopThread = true; + } + + public boolean isIdle() { + synchronized(mCompletedLogfiles) { + return mCompletedLogfiles.isEmpty() && mCurrentLogfile == null; + } + } + + @Override + public void run() { + UserGroupInformation loginUser = null; + + try { + loginUser = UserGroupInformation.getLoginUser(); + } catch (IOException excp) { + mLogger.error("DestinationDispatcherThread.run(): failed to get login user details. Audit files will not be sent to HDFS destination", excp); + } + + if(loginUser == null) { + mLogger.error("DestinationDispatcherThread.run(): failed to get login user. Audit files will not be sent to HDFS destination"); + + return; + } + + loginUser.doAs(new PrivilegedAction<Integer>() { + @Override + public Integer run() { + doRun(); + + return 0; + } + }); + } + + private void doRun() { + init(); + + mDestination.start(); + + long pollIntervalInMs = 1000L; + + while(! mStopThread) { + synchronized(mCompletedLogfiles) { + while(mCompletedLogfiles.isEmpty() && !mStopThread) { + try { + mCompletedLogfiles.wait(pollIntervalInMs); + } catch(InterruptedException excp) { + throw new RuntimeException("DestinationDispatcherThread.run(): failed to wait for log file", excp); + } + } + + mCurrentLogfile = mCompletedLogfiles.pollFirst(); + } + + if(mCurrentLogfile != null) { + sendCurrentFile(); + } + } + + mDestination.stop(); + } + + private void init() { + mLogger.debug("==> DestinationDispatcherThread.init()"); + + String dirName = MiscUtil.replaceTokens(mFileLogBuffer.getDirectory(), 0); + + if(dirName != null) { + File directory = new File(dirName); + + if(directory.exists() && directory.isDirectory()) { + File[] files = directory.listFiles(); + + if(files != null) { + for(File file : files) { + if(file.exists() && file.isFile() && file.canRead()) { + String filename = file.getAbsolutePath(); + if(! mFileLogBuffer.isCurrentFilename(filename)) { + addLogfile(filename); + } + } + } + } + } + } + + mLogger.debug("<== DestinationDispatcherThread.init()"); + } + + private boolean sendCurrentFile() { + mLogger.debug("==> DestinationDispatcherThread.sendCurrentFile()"); + + boolean ret = false; + + long destinationPollIntervalInMs = 1000L; + + openCurrentFile(); + + while(!mStopThread) { + String log = getNextStringifiedLog(); + + if(log == null) { // reached end-of-file + ret = true; + + break; + } + + // loop until log is sent successfully + while(!mStopThread && !mDestination.sendStringified(log)) { + try { + Thread.sleep(destinationPollIntervalInMs); + } catch(InterruptedException excp) { + throw new RuntimeException("LocalFileLogBuffer.sendCurrentFile(" + mCurrentLogfile + "): failed while waiting for destination to be available", excp); + } + } + } + + closeCurrentFile(); + + if(!mStopThread) { + mDestination.flush(); + archiveCurrentFile(); + } + + mLogger.debug("<== DestinationDispatcherThread.sendCurrentFile()"); + + return ret; + } + + private String getNextStringifiedLog() { + String log = null; + + if(mReader != null) { + try { + while(true) { + String line = mReader.readLine(); + + if(line == null) { // reached end-of-file + break; + } + + if(line.endsWith(MiscUtil.ESCAPE_STR)) { + line = line.substring(0, line.length() - MiscUtil.ESCAPE_STR.length()); + + if(log == null) { + log = line; + } else { + log += MiscUtil.LINE_SEPARATOR; + log += line; + } + + continue; + } else { + if(log == null) { + log = line; + } else { + log += line; + } + break; + } + } + } catch (IOException excp) { + mLogger.warn("getNextStringifiedLog.getNextLog(): failed to read from file " + mCurrentLogfile, excp); + } + } + + return log; + } + + private void openCurrentFile() { + mLogger.debug("==> openCurrentFile(" + mCurrentLogfile + ")"); + + if(mCurrentLogfile != null) { + try { + FileInputStream inStr = new FileInputStream(mCurrentLogfile); + + InputStreamReader strReader = createReader(inStr); + + if(strReader != null) { + mReader = new BufferedReader(strReader); + } + } catch(FileNotFoundException excp) { + mLogger.warn("openNextFile(): error while opening file " + mCurrentLogfile, excp); + } + } + + mLogger.debug("<== openCurrentFile(" + mCurrentLogfile + ")"); + } + + private void closeCurrentFile() { + mLogger.debug("==> closeCurrentFile(" + mCurrentLogfile + ")"); + + if(mReader != null) { + try { + mReader.close(); + } catch(IOException excp) { + // ignore + } + } + mReader = null; + + mLogger.debug("<== closeCurrentFile(" + mCurrentLogfile + ")"); + } + + private void archiveCurrentFile() { + if(mCurrentLogfile != null) { + File logFile = new File(mCurrentLogfile); + String archiveDirName = MiscUtil.replaceTokens(mFileLogBuffer.getArchiveDirectory(), 0); + String archiveFilename = archiveDirName + File.separator +logFile.getName(); + + try { + if(logFile.exists()) { + File archiveFile = new File(archiveFilename); + + MiscUtil.createParents(archiveFile); + + if(! logFile.renameTo(archiveFile)) { + // TODO: renameTo() does not work in all cases. in case of failure, copy the file contents to the destination and delete the file + mLogger.warn("archiving failed to move file: " + mCurrentLogfile + " ==> " + archiveFilename); + } + + File archiveDir = new File(archiveDirName); + File[] files = archiveDir.listFiles(new FileFilter() { + @Override + public boolean accept(File f) { + return f.isFile(); + } + }); + + int numOfFilesToDelete = files == null ? 0 : (files.length - mFileLogBuffer.getArchiveFileCount()); + + if(numOfFilesToDelete > 0) { + Arrays.sort(files, new Comparator<File>() { + @Override + public int compare(File f1, File f2) { + return (int)(f1.lastModified() - f2.lastModified()); + } + }); + + for(int i = 0; i < numOfFilesToDelete; i++) { + if(! files[i].delete()) { + mLogger.warn("archiving failed to delete file: " + files[i].getAbsolutePath()); + } + } + } + } + } catch(Exception excp) { + mLogger.warn("archiveCurrentFile(): faile to move " + mCurrentLogfile + " to archive location " + archiveFilename, excp); + } + } + mCurrentLogfile = null; + } + + public InputStreamReader createReader(InputStream iStr) { + InputStreamReader reader = null; + + if(iStr != null) { + String encoding = mFileLogBuffer.getEncoding(); + + if(encoding != null) { + try { + reader = new InputStreamReader(iStr, encoding); + } catch(UnsupportedEncodingException excp) { + mLogger.warn("createReader(): failed to create input reader.", excp); + } + } + + if(reader == null) { + reader = new InputStreamReader(iStr); + } + } + + return reader; + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("DestinationDispatcherThread {"); + sb.append("ThreadName=").append(this.getName()).append("; "); + sb.append("CompletedLogfiles.size()=").append(mCompletedLogfiles.size()).append("; "); + sb.append("StopThread=").append(mStopThread).append("; "); + sb.append("CurrentLogfile=").append(mCurrentLogfile); + sb.append("}"); + + return sb.toString(); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java new file mode 100644 index 0000000..0d0809a --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jAuditProvider.java @@ -0,0 +1,100 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ranger.audit.provider; + +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + + +public class Log4jAuditProvider extends BaseAuditProvider { + + private static final Log LOG = LogFactory.getLog(Log4jAuditProvider.class); + private static final Log AUDITLOG = LogFactory.getLog("xaaudit." + Log4jAuditProvider.class.getName()); + + public static final String AUDIT_LOG4J_IS_ASYNC_PROP = "xasecure.audit.log4j.is.async"; + public static final String AUDIT_LOG4J_MAX_QUEUE_SIZE_PROP = "xasecure.audit.log4j.async.max.queue.size" ; + public static final String AUDIT_LOG4J_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.log4j.async.max.flush.interval.ms"; + + private Gson mGsonBuilder = null; + + public Log4jAuditProvider() { + LOG.info("Log4jAuditProvider: creating.."); + } + + @Override + public void init(Properties props) { + LOG.info("Log4jAuditProvider.init()"); + + super.init(props); + + try { + mGsonBuilder = new GsonBuilder().setDateFormat("yyyyMMdd-HH:mm:ss.SSS-Z").create(); + } catch(Throwable excp) { + LOG.warn("Log4jAuditProvider.init(): failed to create GsonBuilder object. events will be formated using toString(), instead of Json", excp); + } + } + + @Override + public void log(AuditEventBase event) { + if(! AUDITLOG.isInfoEnabled()) + return; + + if(event != null) { + String eventStr = mGsonBuilder != null ? mGsonBuilder.toJson(event) : event.toString(); + + AUDITLOG.info(eventStr); + } + } + + @Override + public void start() { + // intentionally left empty + } + + @Override + public void stop() { + // intentionally left empty + } + + @Override + public void waitToComplete() { + // intentionally left empty + } + + @Override + public boolean isFlushPending() { + return false; + } + + @Override + public long getLastFlushTime() { + return 0; + } + + @Override + public void flush() { + // intentionally left empty + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java new file mode 100644 index 0000000..bdb1a47 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/Log4jTracer.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ranger.audit.provider; + +import org.apache.commons.logging.Log; + +public class Log4jTracer implements DebugTracer { + private Log mLogger = null; + + public Log4jTracer(Log logger) { + mLogger = logger; + } + + public void debug(String msg) { + mLogger.debug(msg); + } + + public void debug(String msg, Throwable excp) { + mLogger.debug(msg, excp); + } + + public void info(String msg) { + mLogger.info(msg); + } + + public void info(String msg, Throwable excp) { + mLogger.info(msg, excp); + } + + public void warn(String msg) { + mLogger.warn(msg); + } + + public void warn(String msg, Throwable excp) { + mLogger.warn(msg, excp); + } + + public void error(String msg) { + mLogger.error(msg); + } + + public void error(String msg, Throwable excp) { + mLogger.error(msg, excp); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java new file mode 100644 index 0000000..aebef1b --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogBuffer.java @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ranger.audit.provider; + + +public interface LogBuffer<T> { + public void start(LogDestination<T> destination); + + public void stop(); + + boolean isAvailable(); + + public boolean isEmpty(); + + public boolean add(T log); +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java new file mode 100644 index 0000000..44e94ad --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/LogDestination.java @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ranger.audit.provider; + + +public interface LogDestination<T> { + public void start(); + + public void stop(); + + boolean isAvailable(); + + public boolean send(T log); + + public boolean sendStringified(String log); + + public boolean flush(); +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java new file mode 100644 index 0000000..17230b2 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MiscUtil.java @@ -0,0 +1,250 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ranger.audit.provider; + +import java.io.File; +import java.net.InetAddress; +import java.rmi.dgc.VMID; +import java.text.SimpleDateFormat; +import java.util.UUID; + +import org.apache.log4j.helpers.LogLog; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; + +public class MiscUtil { + public static final String TOKEN_START = "%"; + public static final String TOKEN_END = "%"; + public static final String TOKEN_HOSTNAME = "hostname"; + public static final String TOKEN_APP_TYPE = "app-type"; + public static final String TOKEN_JVM_INSTANCE = "jvm-instance"; + public static final String TOKEN_TIME = "time:"; + public static final String TOKEN_PROPERTY = "property:"; + public static final String TOKEN_ENV = "env:"; + public static final String ESCAPE_STR = "\\"; + + static VMID sJvmID = new VMID(); + + public static String LINE_SEPARATOR = System.getProperty("line.separator"); + + private static Gson sGsonBuilder = null; + private static String sApplicationType = null; + + static { + try { + sGsonBuilder = new GsonBuilder().setDateFormat("yyyy-MM-dd HH:mm:ss.SSS").create(); + } catch(Throwable excp) { + LogLog.warn("failed to create GsonBuilder object. stringigy() will return obj.toString(), instead of Json", excp); + } + } + + public static String replaceTokens(String str, long time) { + if(str == null) { + return str; + } + + if(time <= 0) { + time = System.currentTimeMillis(); + } + + for(int startPos = 0; startPos < str.length(); ) { + int tagStartPos = str.indexOf(TOKEN_START, startPos); + + if(tagStartPos == -1) { + break; + } + + int tagEndPos = str.indexOf(TOKEN_END, tagStartPos + TOKEN_START.length()); + + if(tagEndPos == -1) { + break; + } + + String tag = str.substring(tagStartPos, tagEndPos+TOKEN_END.length()); + String token = tag.substring(TOKEN_START.length(), tag.lastIndexOf(TOKEN_END)); + String val = ""; + + if(token != null) { + if(token.equals(TOKEN_HOSTNAME)) { + val = getHostname(); + } else if(token.equals(TOKEN_APP_TYPE)) { + val = getApplicationType(); + } else if(token.equals(TOKEN_JVM_INSTANCE)) { + val = getJvmInstanceId(); + } else if(token.startsWith(TOKEN_PROPERTY)) { + String propertyName = token.substring(TOKEN_PROPERTY.length()); + + val = getSystemProperty(propertyName); + } else if(token.startsWith(TOKEN_ENV)) { + String envName = token.substring(TOKEN_ENV.length()); + + val = getEnv(envName); + } else if(token.startsWith(TOKEN_TIME)) { + String dtFormat = token.substring(TOKEN_TIME.length()); + + val = getFormattedTime(time, dtFormat); + } + } + + if(val == null) { + val = ""; + } + + str = str.substring(0, tagStartPos) + val + str.substring(tagEndPos + TOKEN_END.length()); + startPos = tagStartPos + val.length(); + } + + return str; + } + + public static String getHostname() { + String ret = null; + + try { + ret = InetAddress.getLocalHost().getHostName(); + } catch (Exception excp) { + LogLog.warn("getHostname()", excp); + } + + return ret; + } + + public static void setApplicationType(String applicationType) { + sApplicationType = applicationType; + } + + public static String getApplicationType() { + return sApplicationType; + } + + public static String getJvmInstanceId() { + String ret = Integer.toString(Math.abs(sJvmID.toString().hashCode())); + + return ret; + } + + public static String getSystemProperty(String propertyName) { + String ret = null; + + try { + ret = propertyName != null ? System.getProperty(propertyName) : null; + } catch (Exception excp) { + LogLog.warn("getSystemProperty(" + propertyName + ") failed", excp); + } + + return ret; + } + + public static String getEnv(String envName) { + String ret = null; + + try { + ret = envName != null ? System.getenv(envName) : null; + } catch (Exception excp) { + LogLog.warn("getenv(" + envName + ") failed", excp); + } + + return ret; + } + + public static String getFormattedTime(long time, String format) { + String ret = null; + + try { + SimpleDateFormat sdf = new SimpleDateFormat(format); + + ret = sdf.format(time); + } catch (Exception excp) { + LogLog.warn("SimpleDateFormat.format() failed: " + format, excp); + } + + return ret; + } + + public static void createParents(File file) { + if(file != null) { + String parentName = file.getParent(); + + if (parentName != null) { + File parentDir = new File(parentName); + + if(!parentDir.exists()) { + if(! parentDir.mkdirs()) { + LogLog.warn("createParents(): failed to create " + parentDir.getAbsolutePath()); + } + } + } + } + } + + public static long getNextRolloverTime(long lastRolloverTime, long interval) { + long now = System.currentTimeMillis() / 1000 * 1000; // round to second + + if(lastRolloverTime <= 0) { + // should this be set to the next multiple-of-the-interval from start of the day? + return now + interval; + } else if(lastRolloverTime <= now) { + long nextRolloverTime = now + interval; + + // keep it at 'interval' boundary + long trimInterval = (nextRolloverTime - lastRolloverTime) % interval; + + return nextRolloverTime - trimInterval; + } else { + return lastRolloverTime; + } + } + + public static long getRolloverStartTime(long nextRolloverTime, long interval) { + return (nextRolloverTime <= interval) ? System.currentTimeMillis() : nextRolloverTime - interval; + } + + public static int parseInteger(String str, int defValue) { + int ret = defValue; + + if(str != null) { + try { + ret = Integer.parseInt(str); + } catch(Exception excp) { + // ignore + } + } + + return ret; + } + + public static String generateUniqueId() { + return UUID.randomUUID().toString(); + } + + public static <T> String stringify(T log) { + String ret = null; + + if(log != null) { + if(log instanceof String) { + ret = (String)log; + } else if(MiscUtil.sGsonBuilder != null) { + ret = MiscUtil.sGsonBuilder.toJson(log); + } else { + ret = log.toString(); + } + } + + return ret; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java new file mode 100644 index 0000000..0c2bca6 --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/MultiDestAuditProvider.java @@ -0,0 +1,161 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ranger.audit.provider; + +import java.util.ArrayList; +import java.util.List; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.HBaseAuditEvent; +import org.apache.ranger.audit.model.HdfsAuditEvent; +import org.apache.ranger.audit.model.HiveAuditEvent; +import org.apache.ranger.audit.model.KnoxAuditEvent; +import org.apache.ranger.audit.model.StormAuditEvent; + + +public class MultiDestAuditProvider extends BaseAuditProvider { + + private static final Log LOG = LogFactory.getLog(MultiDestAuditProvider.class); + + protected List<AuditProvider> mProviders = new ArrayList<AuditProvider>(); + + + public MultiDestAuditProvider() { + LOG.info("MultiDestAuditProvider: creating.."); + } + + public MultiDestAuditProvider(AuditProvider provider) { + addAuditProvider(provider); + } + + @Override + public void init(Properties props) { + LOG.info("MultiDestAuditProvider.init()"); + + super.init(props); + + for(AuditProvider provider : mProviders) { + try { + provider.init(props); + } catch(Throwable excp) { + LOG.info("MultiDestAuditProvider.init(): failed" + provider.getClass().getCanonicalName() + ")"); + } + } + } + + public void addAuditProvider(AuditProvider provider) { + if(provider != null) { + LOG.info("MultiDestAuditProvider.addAuditProvider(providerType=" + provider.getClass().getCanonicalName() + ")"); + + mProviders.add(provider); + } + } + + public void addAuditProviders(List<AuditProvider> providers) { + if(providers != null) { + for(AuditProvider provider : providers) { + addAuditProvider(provider); + } + } + } + + @Override + public void log(AuditEventBase event) { + for(AuditProvider provider : mProviders) { + try { + provider.log(event); + } catch(Throwable excp) { + logFailedEvent(event, excp); + } + } + } + + @Override + public void start() { + for(AuditProvider provider : mProviders) { + try { + provider.start(); + } catch(Throwable excp) { + LOG.error("AsyncAuditProvider.start(): failed for provider { " + provider.getClass().getName() + " }", excp); + } + } + } + + @Override + public void stop() { + for(AuditProvider provider : mProviders) { + try { + provider.stop(); + } catch(Throwable excp) { + LOG.error("AsyncAuditProvider.stop(): failed for provider { " + provider.getClass().getName() + " }", excp); + } + } + } + + @Override + public void waitToComplete() { + for(AuditProvider provider : mProviders) { + try { + provider.waitToComplete(); + } catch(Throwable excp) { + LOG.error("AsyncAuditProvider.waitToComplete(): failed for provider { " + provider.getClass().getName() + " }", excp); + } + } + } + + @Override + public boolean isFlushPending() { + for(AuditProvider provider : mProviders) { + if(provider.isFlushPending()) { + return true; + } + } + + return false; + } + + @Override + public long getLastFlushTime() { + long lastFlushTime = 0; + for(AuditProvider provider : mProviders) { + long flushTime = provider.getLastFlushTime(); + + if(flushTime != 0) { + if(lastFlushTime == 0 || lastFlushTime > flushTime) { + lastFlushTime = flushTime; + } + } + } + + return lastFlushTime; + } + + @Override + public void flush() { + for(AuditProvider provider : mProviders) { + try { + provider.flush(); + } catch(Throwable excp) { + LOG.error("AsyncAuditProvider.flush(): failed for provider { " + provider.getClass().getName() + " }", excp); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java new file mode 100644 index 0000000..620951c --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsAuditProvider.java @@ -0,0 +1,92 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ranger.audit.provider.hdfs; + +import java.util.Map; +import java.util.Properties; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.provider.BaseAuditProvider; +import org.apache.ranger.audit.provider.BufferedAuditProvider; +import org.apache.ranger.audit.provider.DebugTracer; +import org.apache.ranger.audit.provider.LocalFileLogBuffer; +import org.apache.ranger.audit.provider.Log4jTracer; +import org.apache.ranger.audit.provider.MiscUtil; + +public class HdfsAuditProvider extends BufferedAuditProvider { + private static final Log LOG = LogFactory.getLog(HdfsAuditProvider.class); + + public static final String AUDIT_HDFS_IS_ASYNC_PROP = "xasecure.audit.hdfs.is.async"; + public static final String AUDIT_HDFS_MAX_QUEUE_SIZE_PROP = "xasecure.audit.hdfs.async.max.queue.size" ; + public static final String AUDIT_HDFS_MAX_FLUSH_INTERVAL_PROP = "xasecure.audit.hdfs.async.max.flush.interval.ms"; + + public HdfsAuditProvider() { + } + + public void init(Properties props) { + LOG.info("HdfsAuditProvider.init()"); + + super.init(props); + + Map<String, String> hdfsProps = BaseAuditProvider.getPropertiesWithPrefix(props, "xasecure.audit.hdfs.config."); + + String encoding = hdfsProps.get("encoding"); + + String hdfsDestinationDirectory = hdfsProps.get("destination.directory"); + String hdfsDestinationFile = hdfsProps.get("destination.file"); + int hdfsDestinationFlushIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("destination.flush.interval.seconds"), 15 * 60); + int hdfsDestinationRolloverIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("destination.rollover.interval.seconds"), 24 * 60 * 60); + int hdfsDestinationOpenRetryIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("destination.open.retry.interval.seconds"), 60); + + String localFileBufferDirectory = hdfsProps.get("local.buffer.directory"); + String localFileBufferFile = hdfsProps.get("local.buffer.file"); + int localFileBufferFlushIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("local.buffer.flush.interval.seconds"), 1 * 60); + int localFileBufferFileBufferSizeBytes = MiscUtil.parseInteger(hdfsProps.get("local.buffer.file.buffer.size.bytes"), 8 * 1024); + int localFileBufferRolloverIntervalSeconds = MiscUtil.parseInteger(hdfsProps.get("local.buffer.rollover.interval.seconds"), 10 * 60); + String localFileBufferArchiveDirectory = hdfsProps.get("local.archive.directory"); + int localFileBufferArchiveFileCount = MiscUtil.parseInteger(hdfsProps.get("local.archive.max.file.count"), 10); + + DebugTracer tracer = new Log4jTracer(LOG); + + HdfsLogDestination<AuditEventBase> mHdfsDestination = new HdfsLogDestination<AuditEventBase>(tracer); + + mHdfsDestination.setDirectory(hdfsDestinationDirectory); + mHdfsDestination.setFile(hdfsDestinationFile); + mHdfsDestination.setFlushIntervalSeconds(hdfsDestinationFlushIntervalSeconds); + mHdfsDestination.setEncoding(encoding); + mHdfsDestination.setRolloverIntervalSeconds(hdfsDestinationRolloverIntervalSeconds); + mHdfsDestination.setOpenRetryIntervalSeconds(hdfsDestinationOpenRetryIntervalSeconds); + + LocalFileLogBuffer<AuditEventBase> mLocalFileBuffer = new LocalFileLogBuffer<AuditEventBase>(tracer); + + mLocalFileBuffer.setDirectory(localFileBufferDirectory); + mLocalFileBuffer.setFile(localFileBufferFile); + mLocalFileBuffer.setFlushIntervalSeconds(localFileBufferFlushIntervalSeconds); + mLocalFileBuffer.setFileBufferSizeBytes(localFileBufferFileBufferSizeBytes); + mLocalFileBuffer.setEncoding(encoding); + mLocalFileBuffer.setRolloverIntervalSeconds(localFileBufferRolloverIntervalSeconds); + mLocalFileBuffer.setArchiveDirectory(localFileBufferArchiveDirectory); + mLocalFileBuffer.setArchiveFileCount(localFileBufferArchiveFileCount); + + setBufferAndDestination(mLocalFileBuffer, mHdfsDestination); + } +} + + + http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java new file mode 100644 index 0000000..6b5cb4b --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/provider/hdfs/HdfsLogDestination.java @@ -0,0 +1,451 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ranger.audit.provider.hdfs; + + +import java.io.IOException; +import java.io.OutputStream; +import java.io.OutputStreamWriter; +import java.io.UnsupportedEncodingException; +import java.net.URI; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.ranger.audit.provider.DebugTracer; +import org.apache.ranger.audit.provider.LogDestination; +import org.apache.ranger.audit.provider.MiscUtil; + +public class HdfsLogDestination<T> implements LogDestination<T> { + public final static String EXCP_MSG_FILESYSTEM_CLOSED = "Filesystem closed"; + + private String mDirectory = null; + private String mFile = null; + private int mFlushIntervalSeconds = 1 * 60; + private String mEncoding = null; + private boolean mIsAppend = false; + private int mRolloverIntervalSeconds = 24 * 60 * 60; + private int mOpenRetryIntervalSeconds = 60; + private DebugTracer mLogger = null; + + private FSDataOutputStream mFsDataOutStream = null; + private OutputStreamWriter mWriter = null; + private String mHdfsFilename = null; + private long mNextRolloverTime = 0; + private long mNextFlushTime = 0; + private long mLastOpenFailedTime = 0; + private boolean mIsStopInProgress = false; + + public HdfsLogDestination(DebugTracer tracer) { + mLogger = tracer; + } + + public String getDirectory() { + return mDirectory; + } + + public void setDirectory(String directory) { + this.mDirectory = directory; + } + + public String getFile() { + return mFile; + } + + public void setFile(String file) { + this.mFile = file; + } + + public int getFlushIntervalSeconds() { + return mFlushIntervalSeconds; + } + + public void setFlushIntervalSeconds(int flushIntervalSeconds) { + mFlushIntervalSeconds = flushIntervalSeconds; + } + + public String getEncoding() { + return mEncoding; + } + + public void setEncoding(String encoding) { + mEncoding = encoding; + } + + public int getRolloverIntervalSeconds() { + return mRolloverIntervalSeconds; + } + + public void setRolloverIntervalSeconds(int rolloverIntervalSeconds) { + this.mRolloverIntervalSeconds = rolloverIntervalSeconds; + } + + public int getOpenRetryIntervalSeconds() { + return mOpenRetryIntervalSeconds; + } + + public void setOpenRetryIntervalSeconds(int minIntervalOpenRetrySeconds) { + this.mOpenRetryIntervalSeconds = minIntervalOpenRetrySeconds; + } + + @Override + public void start() { + mLogger.debug("==> HdfsLogDestination.start()"); + + openFile(); + + mLogger.debug("<== HdfsLogDestination.start()"); + } + + @Override + public void stop() { + mLogger.debug("==> HdfsLogDestination.stop()"); + + mIsStopInProgress = true; + + closeFile(); + + mIsStopInProgress = false; + + mLogger.debug("<== HdfsLogDestination.stop()"); + } + + @Override + public boolean isAvailable() { + return mWriter != null; + } + + @Override + public boolean send(T log) { + boolean ret = false; + + if(log != null) { + String msg = log.toString(); + + ret = sendStringified(msg); + } + + return ret; + } + + @Override + public boolean sendStringified(String log) { + boolean ret = false; + + checkFileStatus(); + + OutputStreamWriter writer = mWriter; + + if(writer != null) { + try { + writer.write(log + MiscUtil.LINE_SEPARATOR); + + ret = true; + } catch (IOException excp) { + mLogger.warn("HdfsLogDestination.sendStringified(): write failed", excp); + + closeFile(); + } + } + + return ret; + } + + @Override + public boolean flush() { + mLogger.debug("==> HdfsLogDestination.flush()"); + + boolean ret = false; + + OutputStreamWriter writer = mWriter; + + if(writer != null) { + try { + writer.flush(); + + ret = true; + } catch (IOException excp) { + logException("HdfsLogDestination: flush() failed", excp); + } + } + + FSDataOutputStream ostream = mFsDataOutStream; + + if(ostream != null) { + try { + ostream.hflush(); + + ret = true; + } catch (IOException excp) { + logException("HdfsLogDestination: hflush() failed", excp); + } + } + + if(ret) { + mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L); + } + + mLogger.debug("<== HdfsLogDestination.flush()"); + + return ret; + } + + private void openFile() { + mLogger.debug("==> HdfsLogDestination.openFile()"); + + closeFile(); + + mNextRolloverTime = MiscUtil.getNextRolloverTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L)); + + long startTime = MiscUtil.getRolloverStartTime(mNextRolloverTime, (mRolloverIntervalSeconds * 1000L)); + + mHdfsFilename = MiscUtil.replaceTokens(mDirectory + org.apache.hadoop.fs.Path.SEPARATOR + mFile, startTime); + + FSDataOutputStream ostream = null; + FileSystem fileSystem = null; + Path pathLogfile = null; + Configuration conf = null; + boolean bOverwrite = false; + + try { + mLogger.debug("HdfsLogDestination.openFile(): opening file " + mHdfsFilename); + + URI uri = URI.create(mHdfsFilename); + + // TODO: mechanism to XA-HDFS plugin to disable auditing of access checks to the current HDFS file + + conf = new Configuration(); + pathLogfile = new Path(mHdfsFilename); + fileSystem = FileSystem.get(uri, conf); + + try { + if(fileSystem.exists(pathLogfile)) { // file already exists. either append to the file or write to a new file + if(mIsAppend) { + mLogger.info("HdfsLogDestination.openFile(): opening file for append " + mHdfsFilename); + + ostream = fileSystem.append(pathLogfile); + } else { + mHdfsFilename = getNewFilename(mHdfsFilename, fileSystem); + pathLogfile = new Path(mHdfsFilename); + } + } + + // if file does not exist or if mIsAppend==false, create the file + if(ostream == null) { + mLogger.info("HdfsLogDestination.openFile(): opening file for write " + mHdfsFilename); + + createParents(pathLogfile, fileSystem); + ostream = fileSystem.create(pathLogfile, bOverwrite); + } + } catch(IOException excp) { + // append may not be supported by the filesystem; or the file might already be open by another application. Try a different filename + String failedFilename = mHdfsFilename; + + mHdfsFilename = getNewFilename(mHdfsFilename, fileSystem); + pathLogfile = new Path(mHdfsFilename); + + mLogger.info("HdfsLogDestination.openFile(): failed in opening file " + failedFilename + ". Will try opening " + mHdfsFilename); + } + + if(ostream == null){ + mLogger.info("HdfsLogDestination.openFile(): opening file for write " + mHdfsFilename); + + createParents(pathLogfile, fileSystem); + ostream = fileSystem.create(pathLogfile, bOverwrite); + } + } catch(Throwable ex) { + mLogger.warn("HdfsLogDestination.openFile() failed", ex); + } finally { + // TODO: unset the property set above to exclude auditing of logfile opening + // System.setProperty(hdfsCurrentFilenameProperty, null); + } + + mWriter = createWriter(ostream); + + if(mWriter != null) { + mLogger.debug("HdfsLogDestination.openFile(): opened file " + mHdfsFilename); + + mFsDataOutStream = ostream; + mNextFlushTime = System.currentTimeMillis() + (mFlushIntervalSeconds * 1000L); + mLastOpenFailedTime = 0; + } else { + mLogger.warn("HdfsLogDestination.openFile(): failed to open file for write " + mHdfsFilename); + + mHdfsFilename = null; + mLastOpenFailedTime = System.currentTimeMillis(); + } + + mLogger.debug("<== HdfsLogDestination.openFile(" + mHdfsFilename + ")"); + } + + private void closeFile() { + mLogger.debug("==> HdfsLogDestination.closeFile()"); + + flush(); + + OutputStreamWriter writer = mWriter; + + mWriter = null; + mFsDataOutStream = null; + + if(writer != null) { + try { + mLogger.info("HdfsLogDestination.closeFile(): closing file " + mHdfsFilename); + + writer.close(); + } catch(IOException excp) { + logException("HdfsLogDestination: failed to close file " + mHdfsFilename, excp); + } + } + + mLogger.debug("<== HdfsLogDestination.closeFile()"); + } + + private void rollover() { + mLogger.debug("==> HdfsLogDestination.rollover()"); + + closeFile(); + + openFile(); + + mLogger.debug("<== HdfsLogDestination.rollover()"); + } + + private void checkFileStatus() { + long now = System.currentTimeMillis(); + + if(mWriter == null) { + if(now > (mLastOpenFailedTime + (mOpenRetryIntervalSeconds * 1000L))) { + openFile(); + } + } else if(now > mNextRolloverTime) { + rollover(); + } else if(now > mNextFlushTime) { + flush(); + } + } + + private OutputStreamWriter createWriter(OutputStream os ) { + OutputStreamWriter writer = null; + + if(os != null) { + if(mEncoding != null) { + try { + writer = new OutputStreamWriter(os, mEncoding); + } catch(UnsupportedEncodingException excp) { + mLogger.warn("HdfsLogDestination.createWriter(): failed to create output writer.", excp); + } + } + + if(writer == null) { + writer = new OutputStreamWriter(os); + } + } + + return writer; + } + + private void createParents(Path pathLogfile, FileSystem fileSystem) { + try { + Path parentPath = pathLogfile != null ? pathLogfile.getParent() : null; + + if(parentPath != null && fileSystem != null && !fileSystem.exists(parentPath)) { + fileSystem.mkdirs(parentPath); + } + } catch (IOException e) { + logException("HdfsLogDestination.createParents() failed", e); + } catch (Throwable e) { + mLogger.warn("HdfsLogDestination.createParents() failed", e); + } + } + + private String getNewFilename(String fileName, FileSystem fileSystem) { + if(fileName == null) { + return ""; + } + + for(int i = 1; ; i++) { + String ret = fileName; + + String strToAppend = "-" + Integer.toString(i); + + int extnPos = ret.lastIndexOf("."); + + if(extnPos < 0) { + ret += strToAppend; + } else { + String extn = ret.substring(extnPos); + + ret = ret.substring(0, extnPos) + strToAppend + extn; + } + + if(fileSystem != null && fileExists(ret, fileSystem)) { + continue; + } else { + return ret; + } + } + } + + private boolean fileExists(String fileName, FileSystem fileSystem) { + boolean ret = false; + + if(fileName != null && fileSystem != null) { + Path path = new Path(fileName); + + try { + ret = fileSystem.exists(path); + } catch(IOException excp) { + // ignore + } + } + + return ret; + } + + private void logException(String msg, IOException excp) { + // during shutdown, the underlying FileSystem might already be closed; so don't print error details + + if(mIsStopInProgress) { + return; + } + + String excpMsgToExclude = EXCP_MSG_FILESYSTEM_CLOSED;; + String excpMsg = excp != null ? excp.getMessage() : null; + boolean excpExcludeLogging = (excpMsg != null && excpMsg.contains(excpMsgToExclude)); + + if(! excpExcludeLogging) { + mLogger.warn(msg, excp); + } + } + + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + + sb.append("HdfsLogDestination {"); + sb.append("Directory=").append(mDirectory).append("; "); + sb.append("File=").append(mFile).append("; "); + sb.append("RolloverIntervalSeconds=").append(mRolloverIntervalSeconds); + sb.append("}"); + + return sb.toString(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java new file mode 100644 index 0000000..34b8e4b --- /dev/null +++ b/agents-audit/src/main/java/org/apache/ranger/audit/test/TestEvents.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + + package org.apache.ranger.audit.test; +import org.apache.commons.logging.Log; +import org.apache.log4j.xml.DOMConfigurator; +import org.apache.ranger.audit.model.AuditEventBase; +import org.apache.ranger.audit.model.HBaseAuditEvent; +import org.apache.ranger.audit.model.HdfsAuditEvent; +import org.apache.ranger.audit.model.HiveAuditEvent; +import org.apache.ranger.audit.model.KnoxAuditEvent; +import org.apache.ranger.audit.model.StormAuditEvent; +import org.apache.ranger.audit.provider.AuditProvider; +import org.apache.ranger.audit.provider.AuditProviderFactory; +import org.apache.ranger.audit.provider.AuditProviderFactory.ApplicationType; +import org.apache.commons.logging.LogFactory; + +import java.io.File; +import java.io.FileInputStream; +import java.util.Date; +import java.util.Properties; + +public class TestEvents { + + private static final Log LOG = LogFactory.getLog(TestEvents.class); + + public static void main(String[] args) { + DOMConfigurator.configure("log4j.xml"); + + LOG.info("==> TestEvents.main()"); + + try { + Properties auditProperties = new Properties(); + + String AUDIT_PROPERTIES_FILE = "xasecure-audit.properties"; + + File propFile = new File(AUDIT_PROPERTIES_FILE); + + if(propFile.exists()) { + LOG.info("Loading Audit properties file" + AUDIT_PROPERTIES_FILE); + + auditProperties.load(new FileInputStream(propFile)); + } else { + LOG.info("Audit properties file missing: " + AUDIT_PROPERTIES_FILE); + + auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.url", "jdbc:mysql://localhost:3306/xa_db"); + auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.user", "xaaudit"); + auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.password", "xaaudit"); + auditProperties.setProperty("xasecure.audit.jpa.javax.persistence.jdbc.driver", "com.mysql.jdbc.Driver"); + + auditProperties.setProperty("xasecure.audit.is.enabled", "true"); + auditProperties.setProperty("xasecure.audit.log4j.is.enabled", "false"); + auditProperties.setProperty("xasecure.audit.log4j.is.async", "false"); + auditProperties.setProperty("xasecure.audit.log4j.async.max.queue.size", "100000"); + auditProperties.setProperty("xasecure.audit.log4j.async.max.flush.interval.ms", "30000"); + auditProperties.setProperty("xasecure.audit.db.is.enabled", "true"); + auditProperties.setProperty("xasecure.audit.db.is.async", "true"); + auditProperties.setProperty("xasecure.audit.db.async.max.queue.size", "100000"); + auditProperties.setProperty("xasecure.audit.db.async.max.flush.interval.ms", "30000"); + auditProperties.setProperty("xasecure.audit.db.batch.size", "100"); + } + + AuditProviderFactory.getInstance().init(auditProperties, ApplicationType.Hdfs); + + AuditProvider provider = AuditProviderFactory.getAuditProvider(); + + LOG.info("provider=" + provider.toString()); + + String strEventCount = args.length > 0 ? args[0] : auditProperties.getProperty("xasecure.audit.test.event.count"); + String strEventPauseTimeInMs = args.length > 1 ? args[1] : auditProperties.getProperty("xasecure.audit.test.event.pause.time.ms"); + String strSleepTimeBeforeExit = args.length > 2 ? args[2] : auditProperties.getProperty("xasecure.audit.test.sleep.time.before.exit.seconds"); + + int eventCount = (strEventCount == null) ? 1024 : Integer.parseInt(strEventCount); + int eventPauseTime = (strEventPauseTimeInMs == null) ? 0 : Integer.parseInt(strEventPauseTimeInMs); + int sleepTimeBeforeExit = ((strSleepTimeBeforeExit == null) ? 0 : Integer.parseInt(strSleepTimeBeforeExit)) * 1000; + + for(int i = 0; i < eventCount; i++) { + AuditEventBase event = getTestEvent(i); + + LOG.info("==> TestEvents.main(" + (i+1) + "): adding " + event.getClass().getName()); + provider.log(event); + + if(eventPauseTime > 0) { + Thread.sleep(eventPauseTime); + } + } + + provider.waitToComplete(); + + // incase of HdfsAuditProvider, logs are saved to local file system which gets sent to HDFS asynchronusly in a separate thread. + // So, at this point it is possible that few local log files haven't made to HDFS. + if(sleepTimeBeforeExit > 0) { + LOG.info("waiting for " + sleepTimeBeforeExit + "ms before exiting.."); + + try { + Thread.sleep(sleepTimeBeforeExit); + } catch(Exception excp) { + LOG.info("error while waiting before exiting.."); + } + } + + provider.stop(); + } catch(Exception excp) { + LOG.info(excp.getLocalizedMessage()); + excp.printStackTrace(); + } + + LOG.info("<== TestEvents.main()"); + } + + private static AuditEventBase getTestEvent(int idx) { + AuditEventBase event = null; + + switch(idx % 5) { + case 0: + event = new HdfsAuditEvent(); + break; + case 1: + event = new HBaseAuditEvent(); + break; + case 2: + event = new HiveAuditEvent(); + break; + case 3: + event = new KnoxAuditEvent(); + break; + case 4: + event = new StormAuditEvent(); + break; + } + event.setEventTime(new Date()); + event.setResultReason(Integer.toString(idx)); + + return event; + } +} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-audit/src/main/resources/META-INF/persistence.xml ---------------------------------------------------------------------- diff --git a/agents-audit/src/main/resources/META-INF/persistence.xml b/agents-audit/src/main/resources/META-INF/persistence.xml index 0b87ab9..21b8f06 100644 --- a/agents-audit/src/main/resources/META-INF/persistence.xml +++ b/agents-audit/src/main/resources/META-INF/persistence.xml @@ -17,12 +17,12 @@ --> <persistence version="2.0" xmlns="http://java.sun.com/xml/ns/persistence" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/persistence http://java.sun.com/xml/ns/persistence/persistence_2_0.xsd"> <persistence-unit name="xa_server"> - <class>com.xasecure.audit.entity.XXBaseAuditEvent</class> - <class>com.xasecure.audit.entity.XXHBaseAuditEvent</class> - <class>com.xasecure.audit.entity.XXHdfsAuditEvent</class> - <class>com.xasecure.audit.entity.XXHiveAuditEvent</class> - <class>com.xasecure.audit.entity.XXKnoxAuditEvent</class> - <class>com.xasecure.audit.entity.XXStormAuditEvent</class> + <class>org.apache.ranger.audit.entity.XXBaseAuditEvent</class> + <class>org.apache.ranger.audit.entity.XXHBaseAuditEvent</class> + <class>org.apache.ranger.audit.entity.XXHdfsAuditEvent</class> + <class>org.apache.ranger.audit.entity.XXHiveAuditEvent</class> + <class>org.apache.ranger.audit.entity.XXKnoxAuditEvent</class> + <class>org.apache.ranger.audit.entity.XXStormAuditEvent</class> <properties> <property name="eclipselink.logging.level" value="SEVERE"/> http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/conf/log4j.properties ---------------------------------------------------------------------- diff --git a/agents-common/conf/log4j.properties b/agents-common/conf/log4j.properties index ca599f2..dd22c6d 100644 --- a/agents-common/conf/log4j.properties +++ b/agents-common/conf/log4j.properties @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -log4j.logger.xaaudit.com.xasecure.audit.provider.Log4jAuditProvider=INFO, hdfsAppender +log4j.logger.xaaudit.org.apache.ranger.audit.provider.Log4jAuditProvider=INFO, hdfsAppender log4j.appender.hdfsAppender=org.apache.log4j.HdfsRollingFileAppender http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/scripts/enable-agent.sh ---------------------------------------------------------------------- diff --git a/agents-common/scripts/enable-agent.sh b/agents-common/scripts/enable-agent.sh index 8024b74..f8d90ad 100755 --- a/agents-common/scripts/enable-agent.sh +++ b/agents-common/scripts/enable-agent.sh @@ -185,7 +185,7 @@ create_jceks() { tempFile=/tmp/jce.$$.out - $JAVA_HOME/bin/java -cp ":${PROJ_INSTALL_LIB_DIR}/*:" com.hortonworks.credentialapi.buildks create "${alias}" -value "${pass}" -provider "jceks://file${jceksFile}" > ${tempFile} 2>&1 + $JAVA_HOME/bin/java -cp ":${PROJ_INSTALL_LIB_DIR}/*:" org.apache.ranger.credentialapi.buildks create "${alias}" -value "${pass}" -provider "jceks://file${jceksFile}" > ${tempFile} 2>&1 if [ $? -ne 0 ] then @@ -359,7 +359,7 @@ then cp ${fullpathorgfn} ${archivefn} if [ $? -eq 0 ] then - ${JAVA} -cp "${INSTALL_CP}" com.xasecure.utils.install.XmlConfigChanger -i ${archivefn} -o ${newfn} -c ${f} -p ${INSTALL_ARGS} + ${JAVA} -cp "${INSTALL_CP}" org.apache.ranger.utils.install.XmlConfigChanger -i ${archivefn} -o ${newfn} -c ${f} -p ${INSTALL_ARGS} if [ $? -eq 0 ] then diff -w ${newfn} ${fullpathorgfn} > /dev/null 2>&1 @@ -544,13 +544,13 @@ then } { if ($1 == "nimbus.authorizer") { - if ($2 ~ /^[ \t]*"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer"[ \t]*$/) { + if ($2 ~ /^[ \t]*"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer"[ \t]*$/) { configured = 1 ; printf("%s\n",$0) ; } else { printf("#%s\n",$0); - printf("nimbus.authorizer: \"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer\"\n") ; + printf("nimbus.authorizer: \"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer\"\n") ; configured = 1 ; } } @@ -560,7 +560,7 @@ then } END { if (configured == 0) { - printf("nimbus.authorizer: \"com.xasecure.authorization.storm.authorizer.XaSecureStormAuthorizer\"\n") ; + printf("nimbus.authorizer: \"org.apache.ranger.authorization.storm.authorizer.RangerStormAuthorizer\"\n") ; } }' ${CFG_FILE} > ${CFG_FILE}.new && cat ${CFG_FILE}.new > ${CFG_FILE} && rm -f ${CFG_FILE}.new http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java b/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java deleted file mode 100644 index 74d096a..0000000 --- a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminClient.java +++ /dev/null @@ -1,32 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - package com.xasecure.admin.client; - - -import com.xasecure.admin.client.datatype.GrantRevokeData; - - -public interface XaAdminClient { - String getPolicies(String repositoryName, long lastModifiedTime, int policyCount, String agentName); - - void grantPrivilege(GrantRevokeData grData) throws Exception; - - void revokePrivilege(GrantRevokeData grData) throws Exception; -}
