This is an automated email from the ASF dual-hosted git repository.

kkloudas pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit c83757d33208bc64462bbb224ec0d2e4089d10f8
Author: Kostas Kloudas <[email protected]>
AuthorDate: Tue Dec 17 13:55:43 2019 +0100

    [FLINK-14170][fs-connector] Add tests for hadoop<2.7 support in 
StreamFileSink
---
 .../org/apache/flink/runtime/util/HadoopUtils.java |  25 +++-
 ...leWriterOldHadoopWithNoTruncateSupportTest.java | 165 +++++++++++++++++++++
 2 files changed, 187 insertions(+), 3 deletions(-)

diff --git 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
index 5a17f13..599ea4e 100644
--- 
a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
+++ 
b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.util;
 
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.util.FlinkRuntimeException;
 
@@ -127,9 +128,28 @@ public class HadoopUtils {
        }
 
        /**
-        * Checks if the Hadoop dependency is at least of the given version.
+        * Checks if the Hadoop dependency is at least the given version.
         */
        public static boolean isMinHadoopVersion(int major, int minor) throws 
FlinkRuntimeException {
+               final Tuple2<Integer, Integer> hadoopVersion = 
getMajorMinorBundledHadoopVersion();
+               int maj = hadoopVersion.f0;
+               int min = hadoopVersion.f1;
+
+               return maj > major || (maj == major && min >= minor);
+       }
+
+       /**
+        * Checks if the Hadoop dependency is at most the given version.
+        */
+       public static boolean isMaxHadoopVersion(int major, int minor) throws 
FlinkRuntimeException {
+               final Tuple2<Integer, Integer> hadoopVersion = 
getMajorMinorBundledHadoopVersion();
+               int maj = hadoopVersion.f0;
+               int min = hadoopVersion.f1;
+
+               return maj < major || (maj == major && min < minor);
+       }
+
+       private static Tuple2<Integer, Integer> 
getMajorMinorBundledHadoopVersion() {
                String versionString = VersionInfo.getVersion();
                String[] versionParts = versionString.split("\\.");
 
@@ -140,7 +160,6 @@ public class HadoopUtils {
 
                int maj = Integer.parseInt(versionParts[0]);
                int min = Integer.parseInt(versionParts[1]);
-
-               return maj > major || (maj == major && min >= minor);
+               return Tuple2.of(maj, min);
        }
 }
diff --git 
a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
new file mode 100644
index 0000000..b12090f
--- /dev/null
+++ 
b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.fs.hdfs;
+
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.OperatingSystem;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.junit.AfterClass;
+import org.junit.Assume;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.BufferedReader;
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStreamReader;
+
+import static java.nio.charset.StandardCharsets.UTF_8;
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertNotNull;
+import static junit.framework.TestCase.assertTrue;
+
+/**
+ * Tests for the {@link HadoopRecoverableWriter} with Hadoop versions pre 
Hadoop 2.7.
+ * Contains tests that show that the user can use the writer with pre-2.7 
versions as
+ * long as he/she does not use the {@code truncate()} functionality of the 
underlying FS.
+ */
+public class HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest {
+
+       @ClassRule
+       public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder();
+
+       private static MiniDFSCluster hdfsCluster;
+
+       /** The cached file system instance. */
+       private static FileSystem fileSystem;
+
+       private static Path basePath;
+
+       @BeforeClass
+       public static void testHadoopVersion() {
+               Assume.assumeTrue(HadoopUtils.isMaxHadoopVersion(2, 7));
+       }
+
+       @BeforeClass
+       public static void verifyOS() {
+               Assume.assumeTrue("HDFS cluster cannot be started on Windows 
without extensions.", !OperatingSystem.isWindows());
+       }
+
+       @BeforeClass
+       public static void createHDFS() throws Exception {
+               final File baseDir = TEMP_FOLDER.newFolder();
+
+               final Configuration hdConf = new Configuration();
+               hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, 
baseDir.getAbsolutePath());
+
+               final MiniDFSCluster.Builder builder = new 
MiniDFSCluster.Builder(hdConf);
+               hdfsCluster = builder.build();
+
+               final org.apache.hadoop.fs.FileSystem hdfs = 
hdfsCluster.getFileSystem();
+
+               fileSystem = new HadoopFileSystem(hdfs);
+               basePath = new Path(hdfs.getUri() + "/tests");
+       }
+
+       @AfterClass
+       public static void destroyHDFS() throws Exception {
+               if (hdfsCluster != null) {
+                       hdfsCluster.getFileSystem().delete(new 
org.apache.hadoop.fs.Path(basePath.toUri()), true);
+                       hdfsCluster.shutdown();
+               }
+       }
+
+       @Test
+       public void testWriteAndCommitWorks() throws IOException {
+               final Path testPath = new Path(basePath, "test-0");
+               final String expectedContent = "test_line";
+
+               final RecoverableWriter writerUnderTest = 
fileSystem.createRecoverableWriter();
+               final RecoverableFsDataOutputStream streamUnderTest =
+                               getOpenStreamToFileWithContent(writerUnderTest, 
testPath, expectedContent);
+               streamUnderTest.closeForCommit().commit();
+
+               verifyFileContent(testPath, expectedContent);
+       }
+
+       @Test
+       public void testRecoveryAfterClosingForCommitWorks() throws IOException 
{
+               final Path testPath = new Path(basePath, "test-1");
+               final String expectedContent = "test_line";
+
+               final RecoverableWriter writerUnderTest = 
fileSystem.createRecoverableWriter();
+               final RecoverableFsDataOutputStream streamUnderTest =
+                               getOpenStreamToFileWithContent(writerUnderTest, 
testPath, expectedContent);
+
+               final RecoverableWriter.CommitRecoverable committable =
+                               
streamUnderTest.closeForCommit().getRecoverable();
+
+               
writerUnderTest.recoverForCommit(committable).commitAfterRecovery();
+
+               verifyFileContent(testPath, expectedContent);
+       }
+
+       @Test
+       public void testExceptionThrownWhenRecoveringWithInProgressFile() 
throws IOException {
+               final RecoverableWriter writerUnderTest = 
fileSystem.createRecoverableWriter();
+               final RecoverableFsDataOutputStream stream = 
writerUnderTest.open(new Path(basePath, "test-2"));
+               final RecoverableWriter.ResumeRecoverable recoverable = 
stream.persist();
+               assertNotNull(recoverable);
+
+               try {
+                       writerUnderTest.recover(recoverable);
+               } catch (IOException e) {
+                       // this is the expected exception and we check also if 
the root cause is the hadoop < 2.7 version
+                       assertTrue(e.getCause() instanceof 
IllegalStateException);
+               }
+       }
+
+       private RecoverableFsDataOutputStream getOpenStreamToFileWithContent(
+                       final RecoverableWriter writerUnderTest,
+                       final Path path,
+                       final String expectedContent) throws IOException {
+
+               final byte[] content = expectedContent.getBytes(UTF_8);
+
+               final RecoverableFsDataOutputStream streamUnderTest = 
writerUnderTest.open(path);
+               streamUnderTest.write(content);
+               return streamUnderTest;
+       }
+
+       private static void verifyFileContent(final Path testPath, final String 
expectedContent) throws IOException {
+               try (FSDataInputStream in = fileSystem.open(testPath);
+                               InputStreamReader ir = new 
InputStreamReader(in, UTF_8);
+                               BufferedReader reader = new BufferedReader(ir)) 
{
+
+                       final String line = reader.readLine();
+                       assertEquals(expectedContent, line);
+               }
+       }
+}

Reply via email to