This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e9bf48f69f2a9a260ef0c01e358a301a8a23f07c Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Mon Sep 11 11:29:53 2023 +0200 [FLINK-33059][connectors][format][filesystem] Some FileInputFormats don't use FileInputFormat#createSplits. If input files are compressed, ensure that the size of the split is not the compressed file size and that the compression decorator is called. --- .../flink/api/common/io/FileInputFormat.java | 2 + .../flink/api/common/io/FileInputFormatTest.java | 89 ++++++++++++++++++++-- .../org/apache/flink/testutils/TestFileUtils.java | 8 +- 3 files changed, 88 insertions(+), 11 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 6a2a78e6cc2..04844512001 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -19,6 +19,7 @@ package org.apache.flink.api.common.io; import org.apache.flink.annotation.Public; +import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory; import org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory; import org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory; @@ -157,6 +158,7 @@ public abstract class FileInputFormat<OT> extends RichInputFormat<OT, FileInputS } } + @VisibleForTesting public static Set<String> getSupportedCompressionFormats() { return INFLATER_INPUT_STREAM_FACTORIES.keySet(); } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java index 09b4607c548..920e3be52fc 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java @@ -18,14 +18,13 @@ package org.apache.flink.api.common.io; -import java.util.Set; - import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileInputSplit; +import org.apache.flink.core.fs.FileStatus; import org.apache.flink.core.fs.FileSystem; import org.apache.flink.core.fs.Path; import org.apache.flink.testutils.TestFileUtils; @@ -42,8 +41,11 @@ import java.io.FileOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.URI; +import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.List; +import java.util.Set; /** Tests for the FileInputFormat */ public class FileInputFormatTest { @@ -597,15 +599,16 @@ public class FileInputFormatTest { // ---- Tests for compressed files --------- /** - * Create directory with files with .deflate extension and see if it creates a split for each - * file. Each split has to start from the beginning. + * Create directory with compressed files and see if it creates a split for each file. Each + * split has to start from the beginning. */ @Test public void testFileInputFormatWithCompression() { try { String tempFile = - TestFileUtils.createTempTextFileDirCompressionFormats( - temporaryFolder.newFolder()); + TestFileUtils.createTempFileDirForProvidedFormats( + temporaryFolder.newFolder(), + FileInputFormat.getSupportedCompressionFormats()); final DummyFileInputFormat format = new DummyFileInputFormat(); format.setFilePath(tempFile); format.configure(new Configuration()); @@ -652,6 +655,61 @@ public class FileInputFormatTest { } } + /** + * Some FileInputFormats don't use FileInputFormat#createSplits (that would detect that the file + * is non-splittable and deal with reading boundaries correctly), they all create splits + * manually from FileSourceSplit. If input files are compressed, ensure that the size of the + * split is not the compressed file size and that the compression decorator is called. + */ + @Test + public void testFileInputFormatWithCompressionFromFileSource() { + try { + String tempFile = + TestFileUtils.createTempFileDirForProvidedFormats( + temporaryFolder.newFolder(), + FileInputFormat.getSupportedCompressionFormats()); + DummyFileInputFormat format = new DummyFileInputFormat(); + format.setFilePath(tempFile); + format.configure(new Configuration()); + + // manually create a FileInputSplit per file as FileSource would do + // see org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader() + List<FileInputSplit> splits = manuallyCreateSplits(tempFile); + final Set<String> supportedCompressionFormats = + FileInputFormat.getSupportedCompressionFormats(); + // one file per compression format, one split per file + Assert.assertEquals(supportedCompressionFormats.size(), splits.size()); + for (FileInputSplit split : splits) { + Assert.assertEquals(0L, split.getStart()); // always read from the beginning. + format.open(split); + Assert.assertTrue(format.compressedRead); + Assert.assertEquals( + FileInputFormat.READ_WHOLE_SPLIT_FLAG, + format.getSplitLength()); // unsplittable compressed files have this size + // as flag for "read whole file" + } + } catch (Exception ex) { + ex.printStackTrace(); + Assert.fail(ex.getMessage()); + } + } + + /** + * Simulates splits created by org.apache.flink.connector.file.src.FileSource (one split per + * file with length = size of the file). For compressed file, the input format should override + * it when it detects that the file is unsplittable in {@link + * FileInputFormat#open(FileInputSplit)}. + */ + private List<FileInputSplit> manuallyCreateSplits(String pathString) throws IOException { + List<FileInputSplit> splits = new ArrayList<>(); + final Path path = new Path(pathString); + final FileSystem fs = path.getFileSystem(); + for (FileStatus file : fs.listStatus(path)) { + // split created like in DeserializationSchemaAdapter.Reader() + splits.add(new FileInputSplit(0, file.getPath(), 0, file.getLen(), null)); + } + return splits; + } // ------------------------------------------------------------------------ // Ignored Files // ------------------------------------------------------------------------ @@ -851,8 +909,9 @@ public class FileInputFormatTest { } } - private class DummyFileInputFormat extends FileInputFormat<IntValue> { + private static class DummyFileInputFormat extends FileInputFormat<IntValue> { private static final long serialVersionUID = 1L; + private boolean compressedRead = false; @Override public boolean reachedEnd() throws IOException { @@ -863,6 +922,22 @@ public class FileInputFormatTest { public IntValue nextRecord(IntValue record) throws IOException { return null; } + + @Override + public void open(FileInputSplit split) throws IOException { + compressedRead = false; + super.open(split); + } + + @Override + protected FSDataInputStream decorateInputStream( + FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { + compressedRead = + getInflaterInputStreamFactory( + extractFileExtension(fileSplit.getPath().getName())) + != null; + return inputStream; + } } private class MultiDummyFileInputFormat extends DummyFileInputFormat { diff --git a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java index 9dedf0cc37c..df87a653c37 100644 --- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java +++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java @@ -25,8 +25,7 @@ import java.io.FileOutputStream; import java.io.FileWriter; import java.io.IOException; import java.nio.file.Files; - -import org.apache.flink.api.common.io.FileInputFormat; +import java.util.Set; public class TestFileUtils { @@ -138,7 +137,8 @@ public class TestFileUtils { return f.toURI().toString(); } - public static String createTempTextFileDirCompressionFormats(File tempDir) throws IOException { + public static String createTempFileDirForProvidedFormats(File tempDir, Set<String> formats) + throws IOException { File f = null; do { f = new File(tempDir, randomFileName(FILE_SUFFIX)); @@ -146,7 +146,7 @@ public class TestFileUtils { f.mkdirs(); f.deleteOnExit(); - for (String extension : FileInputFormat.getSupportedCompressionFormats()) { + for (String extension : formats) { File child = new File(f, randomFileName("." + extension)); child.deleteOnExit();