This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 9c5313e1f634ade92d645dd20ea8b92552a0a3d0
Author: Bhagavan Das <[email protected]>
AuthorDate: Thu Nov 28 11:50:44 2019 +0000

    [FLINK-14170][fs-connector] Support hadoop<2.7 in StreamFileSink when 
truncate() not used
---
 .../fs/hdfs/HadoopRecoverableFsDataOutputStream.java  |  8 +++++++-
 .../runtime/fs/hdfs/HadoopRecoverableWriter.java      | 19 +++++++++++++++----
 2 files changed, 22 insertions(+), 5 deletions(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
index be0d134..05e8cd0 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableFsDataOutputStream.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Deadline;
 import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
 import org.apache.flink.core.fs.RecoverableWriter.CommitRecoverable;
 import org.apache.flink.core.fs.RecoverableWriter.ResumeRecoverable;
+import org.apache.flink.runtime.util.HadoopUtils;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkRuntimeException;
 import org.apache.flink.util.IOUtils;
@@ -34,6 +35,7 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hdfs.DistributedFileSystem;
+import org.apache.hadoop.util.VersionInfo;
 
 import java.io.FileNotFoundException;
 import java.io.IOException;
@@ -174,7 +176,7 @@ class HadoopRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream
        }
 
        private static void ensureTruncateInitialized() throws 
FlinkRuntimeException {
-               if (truncateHandle == null) {
+               if (HadoopUtils.isMinHadoopVersion(2, 7) && truncateHandle == 
null) {
                        Method truncateMethod;
                        try {
                                truncateMethod = 
FileSystem.class.getMethod("truncate", Path.class, long.class);
@@ -192,6 +194,10 @@ class HadoopRecoverableFsDataOutputStream extends 
RecoverableFsDataOutputStream
        }
 
        private static boolean truncate(final FileSystem hadoopFs, final Path 
file, final long length) throws IOException {
+               if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
+                       throw new IllegalStateException("Truncation is not 
available in hadoop version < 2.7 , You are on Hadoop " + 
VersionInfo.getVersion());
+               }
+
                if (truncateHandle != null) {
                        try {
                                return (Boolean) 
truncateHandle.invoke(hadoopFs, file, length);
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
index 03d741b..e81e4d1 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriter.java
@@ -27,6 +27,10 @@ import org.apache.flink.core.fs.RecoverableWriter;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.util.HadoopUtils;
 
+import org.apache.hadoop.util.VersionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 import java.io.IOException;
 import java.util.UUID;
 
@@ -40,6 +44,8 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
 @Internal
 public class HadoopRecoverableWriter implements RecoverableWriter {
 
+       private static final Logger LOG = 
LoggerFactory.getLogger(HadoopRecoverableWriter.class);
+
        /** The Hadoop file system on which the writer operates. */
        private final org.apache.hadoop.fs.FileSystem fs;
 
@@ -50,13 +56,18 @@ public class HadoopRecoverableWriter implements 
RecoverableWriter {
        public HadoopRecoverableWriter(org.apache.hadoop.fs.FileSystem fs) {
                this.fs = checkNotNull(fs);
 
-               // This writer is only supported on a subset of file systems, 
and on
-               // specific versions. We check these schemes and versions 
eagerly for
-               // better error messages.
-               if (!"hdfs".equalsIgnoreCase(fs.getScheme()) || 
!HadoopUtils.isMinHadoopVersion(2, 7)) {
+               // This writer is only supported on a subset of file systems
+               if (!"hdfs".equalsIgnoreCase(fs.getScheme())) {
                        throw new UnsupportedOperationException(
                                        "Recoverable writers on Hadoop are only 
supported for HDFS and for Hadoop version 2.7 or newer");
                }
+
+               // Part of functionality depends on specific versions. We check 
these schemes and versions eagerly for
+               // better error messages.
+               if (!HadoopUtils.isMinHadoopVersion(2, 7)) {
+                       LOG.warn("WARNING: You are running on hadoop version " 
+ VersionInfo.getVersion() + "." +
+                                       " If your RollingPolicy does not roll 
on every checkpoint/savepoint, the StreamingFileSink will throw an exception 
upon recovery.");
+               }
        }
 
        @Override

Reply via email to