This is an automated email from the ASF dual-hosted git repository.

abhi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/master by this push:
     new 4d9648ba7 RANGER-4342: Fix new log file creation when 
errors/exceptions occur writing audits to HDFS as Json (#277)
4d9648ba7 is described below

commit 4d9648ba7b0ac47fbeedc14c2112145ac4c289f9
Author: Abhishek Kumar <abhishekkumar100...@gmail.com>
AuthorDate: Tue Sep 12 12:54:39 2023 -0700

    RANGER-4342: Fix new log file creation when errors/exceptions occur writing 
audits to HDFS as Json (#277)
    
    * Append to last log file in case of errors/exceptions encountered
    * Close streams and reset writers
    * Add unit tests
    * Fallback to create if append fails or is not supported
    ---------
    
    Co-authored-by: abhishek-kumar <abhishek.ku...@cloudera.com>
---
 agents-audit/pom.xml                               |  12 ++
 .../apache/ranger/audit/queue/AuditBatchQueue.java |  49 +++-----
 .../audit/utils/AbstractRangerAuditWriter.java     |  74 +++++++++----
 .../ranger/audit/utils/RangerJSONAuditWriter.java  |  27 +++--
 .../audit/utils/RangerJSONAuditWriterTest.java     | 123 +++++++++++++++++++++
 5 files changed, 220 insertions(+), 65 deletions(-)

diff --git a/agents-audit/pom.xml b/agents-audit/pom.xml
index 82ff16291..791519b39 100644
--- a/agents-audit/pom.xml
+++ b/agents-audit/pom.xml
@@ -394,5 +394,17 @@
             <version>${slf4j.version}</version>
            <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.testng</groupId>
+            <artifactId>testng</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+        </dependency>
     </dependencies>
 </project>
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
index afa2879e2..103f92656 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/queue/AuditBatchQueue.java
@@ -237,15 +237,12 @@ public class AuditBatchQueue extends AuditQueue 
implements Runnable {
                        boolean fileSpoolDrain = false;
                        try {
                                if (fileSpoolerEnabled && 
fileSpooler.isPending()) {
-                                       int percentUsed = queue.size() * 100
-                                                       / getMaxQueueSize();
-                                       long lastAttemptDelta = fileSpooler
-                                                       
.getLastAttemptTimeDelta();
+                                       int percentUsed = queue.size() * 100 / 
getMaxQueueSize();
+                                       long lastAttemptDelta = 
fileSpooler.getLastAttemptTimeDelta();
 
                                        fileSpoolDrain = lastAttemptDelta > 
fileSpoolMaxWaitTime;
                                        // If we should even read from queue?
-                                       if (!isDrain() && !fileSpoolDrain
-                                                       && percentUsed < 
fileSpoolDrainThresholdPercent) {
+                                       if (!isDrain() && !fileSpoolDrain && 
percentUsed < fileSpoolDrainThresholdPercent) {
                                                // Since some files are still 
under progress and it is
                                                // not in drain mode, lets wait 
and retry
                                                if (nextDispatchDuration > 0) {
@@ -259,10 +256,8 @@ public class AuditBatchQueue extends AuditQueue implements 
Runnable {
 
                                AuditEventBase event = null;
 
-                               if (!isToSpool && !isDrain() && !fileSpoolDrain
-                                               && nextDispatchDuration > 0) {
-                                       event = queue.poll(nextDispatchDuration,
-                                                       TimeUnit.MILLISECONDS);
+                               if (!isToSpool && !isDrain() && !fileSpoolDrain 
&& nextDispatchDuration > 0) {
+                                       event = 
queue.poll(nextDispatchDuration, TimeUnit.MILLISECONDS);
                                } else {
                                        // For poll() is non blocking
                                        event = queue.poll();
@@ -271,15 +266,11 @@ public class AuditBatchQueue extends AuditQueue 
implements Runnable {
                                if (event != null) {
                                        localBatchBuffer.add(event);
                                        if (getMaxBatchSize() >= 
localBatchBuffer.size()) {
-                                               queue.drainTo(localBatchBuffer, 
getMaxBatchSize()
-                                                               - 
localBatchBuffer.size());
+                                               queue.drainTo(localBatchBuffer, 
getMaxBatchSize() - localBatchBuffer.size());
                                        }
                                } else {
                                        // poll returned due to timeout, so 
reseting clock
-                                       nextDispatchDuration = lastDispatchTime
-                                                       - 
System.currentTimeMillis()
-                                                       + getMaxBatchInterval();
-
+                                       nextDispatchDuration = lastDispatchTime 
- System.currentTimeMillis() + getMaxBatchInterval();
                                        lastDispatchTime = 
System.currentTimeMillis();
                                }
                        } catch (InterruptedException e) {
@@ -293,8 +284,7 @@ public class AuditBatchQueue extends AuditQueue implements 
Runnable {
                        if (localBatchBuffer.size() > 0 && isToSpool) {
                                // Let spool to the file directly
                                if (isDestActive) {
-                                       logger.info("Switching to file spool. 
Queue=" + getName()
-                                                       + ", dest=" + 
consumer.getName());
+                                       logger.info("Switching to file spool. 
Queue = {}, dest = {}", getName(), consumer.getName());
                                }
                                isDestActive = false;
                                // Just before stashing
@@ -302,20 +292,18 @@ public class AuditBatchQueue extends AuditQueue 
implements Runnable {
                                fileSpooler.stashLogs(localBatchBuffer);
                                addStashedCount(localBatchBuffer.size());
                                localBatchBuffer.clear();
-                       } else if (localBatchBuffer.size() > 0
-                                       && (isDrain()
-                                                       || 
localBatchBuffer.size() >= getMaxBatchSize() || nextDispatchDuration <= 0)) {
+                       } else if (localBatchBuffer.size() > 0 &&
+                                       (isDrain() || localBatchBuffer.size() 
>= getMaxBatchSize() || nextDispatchDuration <= 0)) {
                                if (fileSpoolerEnabled && !isDestActive) {
-                                       logger.info("Switching to writing to 
destination. Queue="
-                                                       + getName() + ", dest=" 
+ consumer.getName());
+                                       logger.info("Switching to writing to 
the destination. Queue = {}, dest = {}",
+                                                       getName(), 
consumer.getName());
                                }
                                // Reset time just before sending the logs
                                lastDispatchTime = System.currentTimeMillis();
                                boolean ret = consumer.log(localBatchBuffer);
                                if (!ret) {
                                        if (fileSpoolerEnabled) {
-                                               logger.info("Switching to file 
spool. Queue="
-                                                               + getName() + 
", dest=" + consumer.getName());
+                                               logger.info("Switching to file 
spool. Queue = {}, dest = {}", getName(), consumer.getName());
                                                // Transient error. Stash and 
move on
                                                
fileSpooler.stashLogs(localBatchBuffer);
                                                isDestActive = false;
@@ -334,9 +322,8 @@ public class AuditBatchQueue extends AuditQueue implements 
Runnable {
 
                        if (isDrain()) {
                                if (!queue.isEmpty() || localBatchBuffer.size() 
> 0) {
-                                       logger.info("Queue is not empty. Will 
retry. queue.size)="
-                                                       + queue.size() + ", 
localBatchBuffer.size()="
-                                                       + 
localBatchBuffer.size());
+                                       logger.info("Queue is not empty. Will 
retry. queue.size = {}, localBatchBuffer.size = {}",
+                                                       queue.size(), 
localBatchBuffer.size());
                                } else {
                                        break;
                                }
@@ -349,12 +336,10 @@ public class AuditBatchQueue extends AuditQueue 
implements Runnable {
                        }
                }
 
-               logger.info("Exiting consumerThread. Queue=" + getName() + ", 
dest="
-                               + consumer.getName());
+               logger.info("Exiting consumerThread. Queue = {}, dest = {}", 
getName(), consumer.getName());
                try {
                        // Call stop on the consumer
-                       logger.info("Calling to stop consumer. name=" + 
getName()
-                                       + ", consumer.name=" + 
consumer.getName());
+                       logger.info("Calling to stop consumer. name = {}, 
consumer.name = {}", getName(), consumer.getName());
 
                        consumer.stop();
                        if (fileSpoolerEnabled) {
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractRangerAuditWriter.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractRangerAuditWriter.java
index 17a7fb91d..0e74e3bd4 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractRangerAuditWriter.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/utils/AbstractRangerAuditWriter.java
@@ -66,6 +66,7 @@ public abstract class AbstractRangerAuditWriter implements 
RangerAuditWriter {
     public boolean                rollOverByDuration               = false;
     public volatile FSDataOutputStream ostream                     = null;   
// output stream wrapped in logWriter
     private boolean               isHFlushCapableStream            = false;
+    protected boolean               reUseLastLogFile               = false;
 
     @Override
     public void init(Properties props, String propPrefix, String 
auditProviderName, Map<String,String> auditConfigs) {
@@ -207,29 +208,25 @@ public abstract class AbstractRangerAuditWriter 
implements RangerAuditWriter {
 
     }
 
-    public void closeFileIfNeeded() throws IOException {
+    public void closeFileIfNeeded() {
         if (logger.isDebugEnabled()) {
             logger.debug("==> AbstractRangerAuditWriter.closeFileIfNeeded()");
         }
 
         if (logWriter == null) {
+            if (logger.isDebugEnabled()){
+                logger.debug("Log writer is null, aborting rollover condition 
check!");
+            }
             return;
         }
 
         if ( System.currentTimeMillis() >= nextRollOverTime.getTime() ) {
-            logger.info("Closing file. Rolling over. name=" + auditProviderName
-                    + ", fileName=" + currentFileName);
-            try {
-                logWriter.flush();
-                logWriter.close();
-            } catch (Throwable t) {
-                logger.error("Error on closing log writter. Exception will be 
ignored. name="
-                        + auditProviderName + ", fileName=" + currentFileName);
-            }
-
-            logWriter = null;
-            ostream   = null;
+            logger.info("Closing file. Rolling over. name = {}, fileName = 
{}", auditProviderName, currentFileName);
+            logWriter.flush();
+            closeWriter();
+            resetWriter();
             currentFileName = null;
+            reUseLastLogFile = false;
 
             if (!rollOverByDuration) {
                 try {
@@ -238,7 +235,8 @@ public abstract class AbstractRangerAuditWriter implements 
RangerAuditWriter {
                     }
                     nextRollOverTime = 
rollingTimeUtil.computeNextRollingTime(rolloverPeriod);
                 } catch ( Exception e) {
-                    logger.warn("Rollover by file.rollover.period 
failed...will be using the file.rollover.sec for " + fileSystemScheme + " audit 
file rollover...", e);
+                    logger.warn("Rollover by file.rollover.period failed", e);
+                    logger.warn("Using the file.rollover.sec for {} audit file 
rollover...", fileSystemScheme);
                     nextRollOverTime = rollOverByDuration();
                 }
             } else {
@@ -262,10 +260,25 @@ public abstract class AbstractRangerAuditWriter 
implements RangerAuditWriter {
         }
 
         if (logWriter == null) {
-            // Create the file to write
-            logger.info("Creating new log file. auditPath=" + fullPath);
-            createFileSystemFolders();
-            ostream               = fileSystem.create(auditPath);
+            boolean appendMode = false;
+            // if append is supported, reuse last log file
+            if (reUseLastLogFile && fileSystem.hasPathCapability(auditPath, 
CommonPathCapabilities.FS_APPEND)) {
+                logger.info("Appending to last log file. auditPath = {}", 
fullPath);
+                try {
+                    ostream = fileSystem.append(auditPath);
+                    appendMode = true;
+                } catch (Exception e){
+                    logger.error("Failed to append to file {} due to {}", 
fullPath, e.getMessage());
+                    logger.info("Falling back to create a new log file!");
+                    appendMode = false;
+                }
+            }
+            if (!appendMode) {
+                // Create the file to write
+                logger.info("Creating new log file. auditPath = {}", fullPath);
+                createFileSystemFolders();
+                ostream = fileSystem.create(auditPath);
+            }
             logWriter             = new PrintWriter(ostream);
             isHFlushCapableStream = 
ostream.hasCapability(StreamCapabilities.HFLUSH);
         }
@@ -277,16 +290,39 @@ public abstract class AbstractRangerAuditWriter 
implements RangerAuditWriter {
         return logWriter;
     }
 
+    /**
+     * Closes the writer after writing audits
+     **/
     public void closeWriter() {
         if (logger.isDebugEnabled()) {
             logger.debug("==> AbstractRangerAuditWriter.closeWriter()");
         }
 
+        if (ostream != null) {
+            try {
+                ostream.close();
+            } catch (IOException e) {
+                logger.error("Error closing the stream {}", e.getMessage());
+            }
+        }
+        if (logWriter != null)
+            logWriter.close();
+
+        if (logger.isDebugEnabled()) {
+            logger.debug("<== AbstractRangerAuditWriter.closeWriter()");
+        }
+    }
+
+    public void resetWriter() {
+        if (logger.isDebugEnabled()) {
+            logger.debug("==> AbstractRangerAuditWriter.resetWriter()");
+        }
+
         logWriter = null;
         ostream = null;
 
         if (logger.isDebugEnabled()) {
-            logger.debug("<== AbstractRangerAuditWriter.closeWriter()");
+            logger.debug("<== AbstractRangerAuditWriter.resetWriter()");
         }
     }
 
diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/utils/RangerJSONAuditWriter.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/utils/RangerJSONAuditWriter.java
index 755b76df7..f74f0cbd3 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/utils/RangerJSONAuditWriter.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/utils/RangerJSONAuditWriter.java
@@ -88,12 +88,10 @@ public class RangerJSONAuditWriter extends 
AbstractRangerAuditWriter {
     }
 
     synchronized public boolean logJSON(final Collection<String> events) 
throws Exception {
-        boolean     ret = false;
         PrintWriter out = null;
         try {
             if (logger.isDebugEnabled()) {
-                logger.debug("UGI=" + MiscUtil.getUGILoginUser()
-                        + ". Will write to HDFS file=" + currentFileName);
+                logger.debug("UGI = {}, will write to HDFS file = {}", 
MiscUtil.getUGILoginUser(), currentFileName);
             }
             out = MiscUtil.executePrivilegedAction(new 
PrivilegedExceptionAction<PrintWriter>() {
                 @Override
@@ -108,28 +106,30 @@ public class RangerJSONAuditWriter extends 
AbstractRangerAuditWriter {
             // flush and check the stream for errors
             if (out.checkError()) {
                 // In theory, this count may NOT be accurate as part of the 
messages may have been successfully written.
-                // However, in practice, since client does buffering, either 
all of none would succeed.
-                out.close();
+                // However, in practice, since client does buffering, either 
all or none would succeed.
+                logger.error("Stream encountered errors while writing audits 
to HDFS!");
                 closeWriter();
-                return ret;
+                resetWriter();
+                reUseLastLogFile = true;
+                return false;
             }
         } catch (Exception e) {
-            if (out != null) {
-                out.close();
-            }
+            logger.error("Exception encountered while writing audits to 
HDFS!", e);
             closeWriter();
-            return ret;
+            resetWriter();
+            reUseLastLogFile = true;
+            return false;
         } finally {
-            ret = true;
             if (logger.isDebugEnabled()) {
                 logger.debug("Flushing HDFS audit. Event Size:" + 
events.size());
             }
             if (out != null) {
                 out.flush();
             }
+            //closeWriter();
         }
 
-        return ret;
+        return true;
     }
 
     @Override
@@ -166,8 +166,7 @@ public class RangerJSONAuditWriter extends 
AbstractRangerAuditWriter {
             // close the file inline with audit logging.
             closeFileIfNeeded();
         }
-        // Either there are no open log file or the previous one has been 
rolled
-        // over
+        // Either there are no open log file or the previous one has been 
rolled over
         PrintWriter logWriter = createWriter();
         return logWriter;
     }
diff --git 
a/agents-audit/src/test/java/org/apache/ranger/audit/utils/RangerJSONAuditWriterTest.java
 
b/agents-audit/src/test/java/org/apache/ranger/audit/utils/RangerJSONAuditWriterTest.java
new file mode 100644
index 000000000..3d65790d5
--- /dev/null
+++ 
b/agents-audit/src/test/java/org/apache/ranger/audit/utils/RangerJSONAuditWriterTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ranger.audit.utils;
+
+import org.apache.hadoop.fs.CommonPathCapabilities;
+import org.apache.hadoop.fs.FileSystem;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.Map;
+import java.util.HashMap;
+import java.util.Properties;
+import java.util.Collections;
+import java.io.PrintWriter;
+
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class RangerJSONAuditWriterTest {
+
+    public Properties props;
+    public Map<String, String> auditConfigs;
+
+    public void setup(){
+        props = new Properties();
+        props.setProperty("test.dir", "/tmp");
+        auditConfigs = new HashMap<>();
+        auditConfigs.put(FileSystem.FS_DEFAULT_NAME_KEY, 
FileSystem.DEFAULT_FS);
+    }
+
+    @Test
+    public void checkReUseFlagInStreamErrors() throws Exception {
+
+        RangerJSONAuditWriter jsonAuditWriter = spy(new 
RangerJSONAuditWriter());
+        PrintWriter out = mock(PrintWriter.class);
+
+        setup();
+        jsonAuditWriter.init(props, "test", "localfs", auditConfigs);
+
+        assertFalse(jsonAuditWriter.reUseLastLogFile);
+        when(jsonAuditWriter.getLogFileStream()).thenReturn(out);
+        when(out.checkError()).thenReturn(true);
+        assertFalse(jsonAuditWriter.logJSON(Collections.singleton("This event 
will not be logged!")));
+        assertTrue(jsonAuditWriter.reUseLastLogFile);
+
+        // cleanup
+        jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
+        jsonAuditWriter.logJSON(Collections.singleton("cleaning up!"));
+        jsonAuditWriter.closeWriter();
+    }
+
+    @Test
+    public void checkAppendtoFileWhenExceptionsOccur() throws Exception {
+        RangerJSONAuditWriter jsonAuditWriter = spy(new 
RangerJSONAuditWriter());
+
+        setup();
+        jsonAuditWriter.init(props, "test", "localfs", auditConfigs);
+        jsonAuditWriter.createFileSystemFolders();
+        // File creation should fail with an exception which will trigger 
append next time.
+        when(jsonAuditWriter.fileSystem.create(jsonAuditWriter.auditPath))
+                .thenThrow(new IOException("Creation not allowed!"));
+        jsonAuditWriter.logJSON(Collections.singleton("This event will not be 
logged!"));
+        jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
+
+        assertTrue(jsonAuditWriter.reUseLastLogFile);
+        assertNull(jsonAuditWriter.ostream);
+        assertNull(jsonAuditWriter.logWriter);
+
+        jsonAuditWriter.fileSystem = mock(FileSystem.class);
+        when(jsonAuditWriter.fileSystem
+                .hasPathCapability(jsonAuditWriter.auditPath, 
CommonPathCapabilities.FS_APPEND)).thenReturn(true);
+        jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath);
+        // this will lead to an exception since append is called on mocks
+        jsonAuditWriter.logJSON(Collections.singleton(
+                "This event should be appended but won't be as appended we use 
mocks."));
+    }
+
+
+    @Test
+    public void checkFileRolloverAfterThreshold() throws Exception {
+        RangerJSONAuditWriter jsonAuditWriter = spy(new 
RangerJSONAuditWriter());
+
+        setup();
+        props.setProperty("test.file.rollover.enable.periodic.rollover", 
"true");
+        props.setProperty("test.file.rollover.periodic.rollover.check.sec", 
"2");
+        // rollover log file after this interval
+        jsonAuditWriter.fileRolloverSec = 5; // in seconds
+        jsonAuditWriter.init(props, "test", "localfs", auditConfigs);
+
+
+        assertTrue(jsonAuditWriter.logJSON(Collections.singleton("First file 
created and added this line!")));
+        jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); // 
cleanup
+        Thread.sleep(6000);
+
+        assertFalse(jsonAuditWriter.reUseLastLogFile);
+        assertNull(jsonAuditWriter.ostream);
+        assertNull(jsonAuditWriter.logWriter);
+
+        assertTrue(jsonAuditWriter.logJSON(Collections.singleton("Second file 
created since rollover happened!")));
+        jsonAuditWriter.fileSystem.deleteOnExit(jsonAuditWriter.auditPath); // 
cleanup
+        jsonAuditWriter.closeWriter();
+    }
+}

Reply via email to