This is an automated email from the ASF dual-hosted git repository. kkloudas pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit c83757d33208bc64462bbb224ec0d2e4089d10f8 Author: Kostas Kloudas <[email protected]> AuthorDate: Tue Dec 17 13:55:43 2019 +0100 [FLINK-14170][fs-connector] Add tests for hadoop<2.7 support in StreamFileSink --- .../org/apache/flink/runtime/util/HadoopUtils.java | 25 +++- ...leWriterOldHadoopWithNoTruncateSupportTest.java | 165 +++++++++++++++++++++ 2 files changed, 187 insertions(+), 3 deletions(-) diff --git a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java index 5a17f13..599ea4e 100644 --- a/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java +++ b/flink-filesystems/flink-hadoop-fs/src/main/java/org/apache/flink/runtime/util/HadoopUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.util; +import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.util.FlinkRuntimeException; @@ -127,9 +128,28 @@ public class HadoopUtils { } /** - * Checks if the Hadoop dependency is at least of the given version. + * Checks if the Hadoop dependency is at least the given version. */ public static boolean isMinHadoopVersion(int major, int minor) throws FlinkRuntimeException { + final Tuple2<Integer, Integer> hadoopVersion = getMajorMinorBundledHadoopVersion(); + int maj = hadoopVersion.f0; + int min = hadoopVersion.f1; + + return maj > major || (maj == major && min >= minor); + } + + /** + * Checks if the Hadoop dependency is at most the given version. + */ + public static boolean isMaxHadoopVersion(int major, int minor) throws FlinkRuntimeException { + final Tuple2<Integer, Integer> hadoopVersion = getMajorMinorBundledHadoopVersion(); + int maj = hadoopVersion.f0; + int min = hadoopVersion.f1; + + return maj < major || (maj == major && min < minor); + } + + private static Tuple2<Integer, Integer> getMajorMinorBundledHadoopVersion() { String versionString = VersionInfo.getVersion(); String[] versionParts = versionString.split("\\."); @@ -140,7 +160,6 @@ public class HadoopUtils { int maj = Integer.parseInt(versionParts[0]); int min = Integer.parseInt(versionParts[1]); - - return maj > major || (maj == major && min >= minor); + return Tuple2.of(maj, min); } } diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java new file mode 100644 index 0000000..b12090f --- /dev/null +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.fs.hdfs; + +import org.apache.flink.core.fs.FSDataInputStream; +import org.apache.flink.core.fs.FileSystem; +import org.apache.flink.core.fs.Path; +import org.apache.flink.core.fs.RecoverableFsDataOutputStream; +import org.apache.flink.core.fs.RecoverableWriter; +import org.apache.flink.runtime.util.HadoopUtils; +import org.apache.flink.util.OperatingSystem; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.junit.AfterClass; +import org.junit.Assume; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStreamReader; + +import static java.nio.charset.StandardCharsets.UTF_8; +import static junit.framework.TestCase.assertEquals; +import static junit.framework.TestCase.assertNotNull; +import static junit.framework.TestCase.assertTrue; + +/** + * Tests for the {@link HadoopRecoverableWriter} with Hadoop versions pre Hadoop 2.7. + * Contains tests that show that the user can use the writer with pre-2.7 versions as + * long as he/she does not use the {@code truncate()} functionality of the underlying FS. + */ +public class HadoopRecoverableWriterOldHadoopWithNoTruncateSupportTest { + + @ClassRule + public static final TemporaryFolder TEMP_FOLDER = new TemporaryFolder(); + + private static MiniDFSCluster hdfsCluster; + + /** The cached file system instance. */ + private static FileSystem fileSystem; + + private static Path basePath; + + @BeforeClass + public static void testHadoopVersion() { + Assume.assumeTrue(HadoopUtils.isMaxHadoopVersion(2, 7)); + } + + @BeforeClass + public static void verifyOS() { + Assume.assumeTrue("HDFS cluster cannot be started on Windows without extensions.", !OperatingSystem.isWindows()); + } + + @BeforeClass + public static void createHDFS() throws Exception { + final File baseDir = TEMP_FOLDER.newFolder(); + + final Configuration hdConf = new Configuration(); + hdConf.set(MiniDFSCluster.HDFS_MINIDFS_BASEDIR, baseDir.getAbsolutePath()); + + final MiniDFSCluster.Builder builder = new MiniDFSCluster.Builder(hdConf); + hdfsCluster = builder.build(); + + final org.apache.hadoop.fs.FileSystem hdfs = hdfsCluster.getFileSystem(); + + fileSystem = new HadoopFileSystem(hdfs); + basePath = new Path(hdfs.getUri() + "/tests"); + } + + @AfterClass + public static void destroyHDFS() throws Exception { + if (hdfsCluster != null) { + hdfsCluster.getFileSystem().delete(new org.apache.hadoop.fs.Path(basePath.toUri()), true); + hdfsCluster.shutdown(); + } + } + + @Test + public void testWriteAndCommitWorks() throws IOException { + final Path testPath = new Path(basePath, "test-0"); + final String expectedContent = "test_line"; + + final RecoverableWriter writerUnderTest = fileSystem.createRecoverableWriter(); + final RecoverableFsDataOutputStream streamUnderTest = + getOpenStreamToFileWithContent(writerUnderTest, testPath, expectedContent); + streamUnderTest.closeForCommit().commit(); + + verifyFileContent(testPath, expectedContent); + } + + @Test + public void testRecoveryAfterClosingForCommitWorks() throws IOException { + final Path testPath = new Path(basePath, "test-1"); + final String expectedContent = "test_line"; + + final RecoverableWriter writerUnderTest = fileSystem.createRecoverableWriter(); + final RecoverableFsDataOutputStream streamUnderTest = + getOpenStreamToFileWithContent(writerUnderTest, testPath, expectedContent); + + final RecoverableWriter.CommitRecoverable committable = + streamUnderTest.closeForCommit().getRecoverable(); + + writerUnderTest.recoverForCommit(committable).commitAfterRecovery(); + + verifyFileContent(testPath, expectedContent); + } + + @Test + public void testExceptionThrownWhenRecoveringWithInProgressFile() throws IOException { + final RecoverableWriter writerUnderTest = fileSystem.createRecoverableWriter(); + final RecoverableFsDataOutputStream stream = writerUnderTest.open(new Path(basePath, "test-2")); + final RecoverableWriter.ResumeRecoverable recoverable = stream.persist(); + assertNotNull(recoverable); + + try { + writerUnderTest.recover(recoverable); + } catch (IOException e) { + // this is the expected exception and we check also if the root cause is the hadoop < 2.7 version + assertTrue(e.getCause() instanceof IllegalStateException); + } + } + + private RecoverableFsDataOutputStream getOpenStreamToFileWithContent( + final RecoverableWriter writerUnderTest, + final Path path, + final String expectedContent) throws IOException { + + final byte[] content = expectedContent.getBytes(UTF_8); + + final RecoverableFsDataOutputStream streamUnderTest = writerUnderTest.open(path); + streamUnderTest.write(content); + return streamUnderTest; + } + + private static void verifyFileContent(final Path testPath, final String expectedContent) throws IOException { + try (FSDataInputStream in = fileSystem.open(testPath); + InputStreamReader ir = new InputStreamReader(in, UTF_8); + BufferedReader reader = new BufferedReader(ir)) { + + final String line = reader.readLine(); + assertEquals(expectedContent, line); + } + } +}
