This is an automated email from the ASF dual-hosted git repository.
rmani pushed a commit to branch ranger-2.2
in repository https://gitbox.apache.org/repos/asf/ranger.git
The following commit(s) were added to refs/heads/ranger-2.2 by this push:
new 0c67522 RANGER-3226:Ranger Audit framework to handle
UnsupportedOperationException while writing into S3AFileSystem with hflush api
0c67522 is described below
commit 0c6752205d849fad674e5f907679a3eaa98433f5
Author: Ramesh Mani <[email protected]>
AuthorDate: Tue Mar 30 00:13:46 2021 -0700
RANGER-3226:Ranger Audit framework to handle UnsupportedOperationException
while writing into S3AFileSystem with hflush api
Change-Id: I3904609af718b2f9372b5d4569507fc6e71ededa
Signed-off-by: Ramesh Mani <[email protected]>
---
.../ranger/audit/destination/HDFSAuditDestination.java | 12 +++++++++++-
1 file changed, 11 insertions(+), 1 deletion(-)
diff --git
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
index 906ff34..5e6f402 100644
---
a/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
+++
b/agents-audit/src/main/java/org/apache/ranger/audit/destination/HDFSAuditDestination.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.StreamCapabilities;
import org.apache.ranger.audit.model.AuditEventBase;
import org.apache.ranger.audit.provider.MiscUtil;
import org.apache.ranger.audit.utils.RollingTimeUtil;
@@ -74,6 +75,8 @@ public class HDFSAuditDestination extends AuditDestination {
private boolean rollOverByDuration = false;
+ private boolean isHFlushCapableStream = false;
+
@Override
public void init(Properties prop, String propPrefix) {
super.init(prop, propPrefix);
@@ -299,6 +302,7 @@ public class HDFSAuditDestination extends AuditDestination {
ostream = fileSystem.create(hdfPath);
logWriter = new PrintWriter(ostream);
currentFileName = fullPath;
+ isHFlushCapableStream =
ostream.hasCapability(StreamCapabilities.HFLUSH);
}
return logWriter;
}
@@ -374,7 +378,13 @@ public class HDFSAuditDestination extends AuditDestination
{
// 1) PrinterWriter does not
have bufferring of its own so
// we need to flush its
underlying stream
// 2) HDFS flush() does not
really flush all the way to disk.
- ostream.hflush();
+ if (isHFlushCapableStream) {
+ //Checking HFLUSH
capability of the stream because of HADOOP-13327.
+ //For S3 filesystem,
hflush throws UnsupportedOperationException and hence we call flush.
+ ostream.hflush();
+ } else {
+ ostream.flush();
+ }
logger.info("Flush HDFS audit logs
completed.....");
}
} catch (IOException e) {