This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 9c5313e1f634ade92d645dd20ea8b92552a0a3d0 Author: Bhagavan Das <[email protected]> AuthorDate: Thu Nov 28 11:50:44 2019 +0000 [FLINK-14170][fs-connector] Support hadoop<2.7 in StreamFileSink when truncate() not used --- .../fs/hdfs/HadoopRecoverableFsDataOutputStream.java | 8 +++++++- .../runtime/fs/hdfs/HadoopRecoverableWriter.java | 19 +++++++++++++++---- 2 files changed, 22 insertions(+), 5 deletions(-) diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java index be0d134..05e8cd0 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Deadline; import org.apache.flink.core.fs.RecoverableFsDataOutputStream; import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable; import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable; +import org.apache.flink.runtime.util.HadoopUtils; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkRuntimeException; import org.apache.flink.util.IOUtils; @@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; +import org.apache.hadoop.util.VersionInfo; import java.io.FileNotFoundException; import java.io.IOException; @@ -174,7 +176,7 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream } private static void ensureTruncateInitialized() throws FlinkRuntimeException { - if (truncateHandle == null) { + if (HadoopUtils.isMinHadoopVersion(2, 7) && truncateHandle == null) { Method truncateMethod; try { truncateMethod = FileSystem.class.getMethod("truncate", Path.class, long.class); @@ -192,6 +194,10 @@ class HadoopRecoverableFsDataOutputStream extends RecoverableFsDataOutputStream } private static boolean truncate(final FileSystem hadoopFs, final Path file, final long length) throws IOException { + if (!HadoopUtils.isMinHadoopVersion(2, 7)) { + throw new IllegalStateException("Truncation is not available in hadoop version < 2.7 , You are on Hadoop " + VersionInfo.getVersion()); + } + if (truncateHandle != null) { try { return (Boolean) truncateHandle.invoke(hadoopFs, file, length); diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java index 03d741b..e81e4d1 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java @@ -27,6 +27,10 @@ import org.apache.flink.core.fs.RecoverableWriter; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.hadoop.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.IOException; import java.util.UUID; @@ -40,6 +44,8 @@ import static org.apache.flink.util.Preconditions.checkNotNull; @Internal public class HadoopRecoverableWriter implements RecoverableWriter { + private static final Logger LOG = LoggerFactory.getLogger(HadoopRecoverableWriter.class); + /** The Hadoop file system on which the writer operates. */ private final org.apache.hadoop.fs.FileSystem fs; @@ -50,13 +56,18 @@ public class HadoopRecoverableWriter implements RecoverableWriter { public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) { this.fs = checkNotNull(fs); - // This writer is only supported on a subset of file systems, and on - // specific versions. We check these schemes and versions eagerly for - // better error messages. - if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || !HadoopUtils.isMinHadoopVersion(2, 7)) { + // This writer is only supported on a subset of file systems + if (!"hdfs".equalsIgnoreCase(fs.getScheme())) { throw new UnsupportedOperationException( "Recoverable writers on Hadoop are only supported for HDFS and for Hadoop version 2.7 or newer"); } + + // Part of functionality depends on specific versions. We check these schemes and versions eagerly for + // better error messages. + if (!HadoopUtils.isMinHadoopVersion(2, 7)) { + LOG.warn("WARNING: You are running on hadoop version " + VersionInfo.getVersion() + "." + + " If your RollingPolicy does not roll on every checkpoint/savepoint, the StreamingFileSink will throw an exception upon recovery."); + } } @Override
