Updated Branches: refs/heads/flume-1.4 543a5a196 -> 5f03e1d51
FLUME-2027. Check for default replication fails on federated cluster in hdfs sink (Mike Percy via Hari Shreedharan) Project: http://git-wip-us.apache.org/repos/asf/flume/repo Commit: http://git-wip-us.apache.org/repos/asf/flume/commit/5f03e1d5 Tree: http://git-wip-us.apache.org/repos/asf/flume/tree/5f03e1d5 Diff: http://git-wip-us.apache.org/repos/asf/flume/diff/5f03e1d5 Branch: refs/heads/flume-1.4 Commit: 5f03e1d513bb3a041d7cd2be02816525db55b1d9 Parents: 543a5a1 Author: Hari Shreedharan <[email protected]> Authored: Mon May 6 20:44:57 2013 -0700 Committer: Hari Shreedharan <[email protected]> Committed: Mon May 6 20:47:21 2013 -0700 ---------------------------------------------------------------------- .../apache/flume/sink/hdfs/AbstractHDFSWriter.java | 57 ++++++++++++++- .../flume/sink/hdfs/HDFSCompressedDataStream.java | 2 +- .../org/apache/flume/sink/hdfs/HDFSDataStream.java | 2 +- .../apache/flume/sink/hdfs/HDFSSequenceFile.java | 2 +- 4 files changed, 56 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flume/blob/5f03e1d5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java index ff4f223..bc3b383 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/AbstractHDFSWriter.java @@ -23,6 +23,7 @@ import org.apache.flume.annotations.InterfaceAudience; import org.apache.flume.annotations.InterfaceStability; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -39,7 +40,9 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { private FSDataOutputStream outputStream; private FileSystem fs; + private Path destPath; private Method refGetNumCurrentReplicas = null; + private Method refGetDefaultReplication = null; private Integer configuredMinReplicas = null; final static Object [] NO_ARGS = new Object []{}; @@ -84,26 +87,43 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { } protected void registerCurrentStream(FSDataOutputStream outputStream, - FileSystem fs) { + FileSystem fs, Path destPath) { Preconditions.checkNotNull(outputStream, "outputStream must not be null"); Preconditions.checkNotNull(fs, "fs must not be null"); + Preconditions.checkNotNull(destPath, "destPath must not be null"); this.outputStream = outputStream; this.fs = fs; + this.destPath = destPath; this.refGetNumCurrentReplicas = reflectGetNumCurrentReplicas(outputStream); + this.refGetDefaultReplication = reflectGetDefaultReplication(fs); } protected void unregisterCurrentStream() { this.outputStream = null; this.fs = null; + this.destPath = null; this.refGetNumCurrentReplicas = null; + this.refGetDefaultReplication = null; } public int getFsDesiredReplication() { - if (fs != null) { - return fs.getDefaultReplication(); + short replication = 0; + if (fs != null && destPath != null) { + if (refGetDefaultReplication != null) { + try { + replication = (Short) refGetDefaultReplication.invoke(fs, destPath); + } catch (IllegalAccessException e) { + logger.warn("Unexpected error calling getDefaultReplication(Path)", e); + } catch (InvocationTargetException e) { + logger.warn("Unexpected error calling getDefaultReplication(Path)", e); + } + } else { + // will not work on Federated HDFS (see HADOOP-8014) + replication = fs.getDefaultReplication(); + } } - return 0; + return replication; } /** @@ -163,4 +183,33 @@ public abstract class AbstractHDFSWriter implements HDFSWriter { return m; } + /** + * Find the 'getDefaultReplication' method on the passed <code>fs</code> + * FileSystem that takes a Path argument. + * @return Method or null. + */ + private Method reflectGetDefaultReplication(FileSystem fileSystem) { + Method m = null; + if (fileSystem != null) { + Class<?> fsClass = fileSystem.getClass(); + try { + m = fsClass.getMethod("getDefaultReplication", + new Class<?>[] { Path.class }); + } catch (NoSuchMethodException e) { + logger.debug("FileSystem implementation doesn't support" + + " getDefaultReplication(Path); -- HADOOP-8014 not available; " + + "className = " + fsClass.getName() + "; err = " + e); + } catch (SecurityException e) { + logger.debug("No access to getDefaultReplication(Path) on " + + "FileSystem implementation -- HADOOP-8014 not available; " + + "className = " + fsClass.getName() + "; err = " + e); + } + } + if (m != null) { + logger.debug("Using FileSystem.getDefaultReplication(Path) from " + + "HADOOP-8014"); + } + return m; + } + } http://git-wip-us.apache.org/repos/asf/flume/blob/5f03e1d5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java index 0c618b5..2c2be6a 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSCompressedDataStream.java @@ -102,7 +102,7 @@ public class HDFSCompressedDataStream extends AbstractHDFSWriter { + ") does not support append"); } - registerCurrentStream(fsOut, hdfs); + registerCurrentStream(fsOut, hdfs, dstPath); if (appending) { serializer.afterReopen(); http://git-wip-us.apache.org/repos/asf/flume/blob/5f03e1d5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java index c87fafe..b8214be 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSDataStream.java @@ -90,7 +90,7 @@ public class HDFSDataStream extends AbstractHDFSWriter { } // must call superclass to check for replication issues - registerCurrentStream(outStream, hdfs); + registerCurrentStream(outStream, hdfs, dstPath); if (appending) { serializer.afterReopen(); http://git-wip-us.apache.org/repos/asf/flume/blob/5f03e1d5/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java ---------------------------------------------------------------------- diff --git a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java index 1a401d6..0383744 100644 --- a/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java +++ b/flume-ng-sinks/flume-hdfs-sink/src/main/java/org/apache/flume/sink/hdfs/HDFSSequenceFile.java @@ -92,7 +92,7 @@ public class HDFSSequenceFile extends AbstractHDFSWriter { writer = SequenceFile.createWriter(conf, outStream, serializer.getKeyClass(), serializer.getValueClass(), compType, codeC); - registerCurrentStream(outStream, hdfs); + registerCurrentStream(outStream, hdfs, dstPath); } @Override
