This is an automated email from the ASF dual-hosted git repository.

rmani pushed a commit to branch ranger-2.2
in repository https://gitbox.apache.org/repos/asf/ranger.git


The following commit(s) were added to refs/heads/ranger-2.2 by this push:
     new 0c67522  RANGER-3226:Ranger Audit framework to handle 
UnsupportedOperationException while writing into S3AFileSystem with hflush api
0c67522 is described below

commit 0c6752205d849fad674e5f907679a3eaa98433f5
Author: Ramesh Mani <[email protected]>
AuthorDate: Tue Mar 30 00:13:46 2021 -0700

    RANGER-3226:Ranger Audit framework to handle UnsupportedOperationException 
while writing into S3AFileSystem with hflush api
    
    Change-Id: I3904609af718b2f9372b5d4569507fc6e71ededa
    Signed-off-by: Ramesh Mani <[email protected]>
---
 .../ranger/audit/destination/HDFSAuditDestination.java       | 12 +++++++++++-
 1 file changed, 11 insertions(+), 1 deletion(-)

diff --git 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index 906ff34..5e6f402 100644
--- 
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++ 
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
 import org.apache.ranger.audit.model.AuditEventBase;
 import org.apache.ranger.audit.provider.MiscUtil;
 import org.apache.ranger.audit.utils.RollingTimeUtil;
@@ -74,6 +75,8 @@ public class HDFSAuditDestination extends AuditDestination {
 
        private boolean rollOverByDuration  = false;
 
+       private boolean isHFlushCapableStream    = false;
+
        @Override
        public void init(Properties prop, String propPrefix) {
                super.init(prop, propPrefix);
@@ -299,6 +302,7 @@ public class HDFSAuditDestination extends AuditDestination {
                        ostream = fileSystem.create(hdfPath);
                        logWriter = new PrintWriter(ostream);
                        currentFileName = fullPath;
+                       isHFlushCapableStream = 
ostream.hasCapability(StreamCapabilities.HFLUSH);
                }
                return logWriter;
        }
@@ -374,7 +378,13 @@ public class HDFSAuditDestination extends AuditDestination 
{
                                                // 1) PrinterWriter does not 
have bufferring of its own so
                                                // we need to flush its 
underlying stream
                                                // 2) HDFS flush() does not 
really flush all the way to disk.
-                                               ostream.hflush();
+                                               if (isHFlushCapableStream) {
+                                                       //Checking HFLUSH 
capability of the stream because of HADOOP-13327.
+                                                       //For S3 filesystem, 
hflush throws UnsupportedOperationException and hence we call flush.
+                                                       ostream.hflush();
+                                               } else {
+                                                       ostream.flush();
+                                               }
                                        logger.info("Flush HDFS audit logs 
completed.....");
                                }
                        } catch (IOException e) {

Reply via email to