Repository: flume Updated Branches: refs/heads/trunk 62a4cad22 -> 990776427
FLUME-2443: org.apache.hadoop.fs.FSDataOutputStream.sync() is deprecated in hadoop 2.4 (Hari Shreedharan via Jarek Jarcec Cecho) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/99077642 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/99077642 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/99077642 Branch: refs/heads/trunk Commit: 990776427d0bb12e8775c31fc78d47bea8f6501f Parents: 62a4cad Author: Jarek Jarcec Cecho <[email protected]> Authored: Tue Feb 3 13:32:28 2015 -0800 Committer: Jarek Jarcec Cecho <[email protected]> Committed: Tue Feb 3 13:32:28 2015 -0800 ---------------------------------------------------------------------- .../flume/sink/hdfs/AbstractHDFSWriter.java | 51 ++++++++++++++++++++ .../sink/hdfs/HDFSCompressedDataStream.java | 4 +- .../apache/flume/sink/hdfs/HDFSDataStream.java | 4 +- .../flume/sink/hdfs/HDFSSequenceFile.java | 2 +- 4 files changed, 56 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/99077642/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java index 043ca6c..e367e12 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java @@ -19,6 +19,7 @@ package org.apache.flume.sink.hdfs; import com.google.common.base.Preconditions; import org.apache.flume.Context; +import org.apache.flume.FlumeException; import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.hadoop.fs.FSDataOutputStream; @@ -27,6 +28,7 @@ import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.IOException; import java.io.OutputStream; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -43,6 +45,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { private Path destPath; private Method refGetNumCurrentReplicas = null; private Method refGetDefaultReplication = null; + private Method refHflushOrSync = null; private Integer configuredMinReplicas = null; private Integer numberOfCloseRetries = null; private long timeBetweenCloseRetries = Long.MAX_VALUE; @@ -110,6 +113,7 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { this.destPath = destPath; this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream); this.refGetDefaultReplication = reflectGetDefaultReplication(fs); + this.refHflushOrSync = reflectHflushOrSync(outputStream); } @@ -225,4 +229,51 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { } return m; } + + private Method reflectHflushOrSync(FSDataOutputStream os) { + Method m = null; + if(os != null) { + Class<?> fsDataOutputStreamClass = os.getClass(); + try { + m = fsDataOutputStreamClass.getMethod("hflush"); + } catch (NoSuchMethodException ex) { + logger.debug("HFlush not found. Will use sync() instead"); + try { + m = fsDataOutputStreamClass.getMethod("sync"); + } catch (Exception ex1) { + String msg = "Neither hflush not sync were found. That seems to be " + + "a problem!"; + logger.error(msg); + throw new FlumeException(msg, ex1); + } + } + } + return m; + } + + /** + * If hflush is available in this version of HDFS, then this method calls + * hflush, else it calls sync. + * @param os - The stream to flush/sync + * @throws IOException + */ + protected void hflushOrSync(FSDataOutputStream os) throws IOException { + try { + // At this point the refHflushOrSync cannot be null, + // since register method would have thrown if it was. + this.refHflushOrSync.invoke(os); + } catch (InvocationTargetException e) { + String msg = "Error while trying to hflushOrSync!"; + logger.error(msg); + Throwable cause = e.getCause(); + if(cause != null && cause instanceof IOException) { + throw (IOException)cause; + } + throw new FlumeException(msg, e); + } catch (Exception e) { + String msg = "Error while trying to hflushOrSync!"; + logger.error(msg); + throw new FlumeException(msg, e); + } + } } http://git-wip-us.apache.org/repos/asf/flume/blob/99077642/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index dc93e4f..f128795 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -139,7 +139,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { isFinished = true; } fsOut.flush(); - fsOut.sync(); + hflushOrSync(this.fsOut); } @Override @@ -151,7 +151,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { isFinished = true; } fsOut.flush(); - fsOut.sync(); + hflushOrSync(fsOut); cmpOut.close(); if (compressor != null) { CodecPool.returnCompressor(compressor); http://git-wip-us.apache.org/repos/asf/flume/blob/99077642/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java index 6fa12eb..7054bfc 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java @@ -128,7 +128,7 @@ public class HDFSDataStream extends AbstractHDFSWriter { public void sync() throws IOException { serializer.flush(); outStream.flush(); - outStream.sync(); + hflushOrSync(outStream); } @Override @@ -136,7 +136,7 @@ public class HDFSDataStream extends AbstractHDFSWriter { serializer.flush(); serializer.beforeClose(); outStream.flush(); - outStream.sync(); + hflushOrSync(outStream); outStream.close(); unregisterCurrentStream(); http://git-wip-us.apache.org/repos/asf/flume/blob/99077642/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index 2608987..a261cce 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -110,7 +110,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter { @Override public void sync() throws IOException { - writer.syncFs(); + hflushOrSync(outStream); } @Override
