This is an automated email from the ASF dual-hosted git repository.

echauchot pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e9bf48f69f2a9a260ef0c01e358a301a8a23f07c
Author: Etienne Chauchot <echauc...@apache.org>
AuthorDate: Mon Sep 11 11:29:53 2023 +0200

    [FLINK-33059][connectors][format][filesystem] Some FileInputFormats don't 
use FileInputFormat#createSplits. If input files are compressed, ensure that 
the size of the split is not the compressed file size and that the compression 
decorator is called.
---
 .../flink/api/common/io/FileInputFormat.java       |  2 +
 .../flink/api/common/io/FileInputFormatTest.java   | 89 ++++++++++++++++++++--
 .../org/apache/flink/testutils/TestFileUtils.java  |  8 +-
 3 files changed, 88 insertions(+), 11 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
index 6a2a78e6cc2..04844512001 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.common.io;
 
 import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.api.common.io.compression.Bzip2InputStreamFactory;
 import 
org.apache.flink.api.common.io.compression.DeflateInflaterInputStreamFactory;
 import 
org.apache.flink.api.common.io.compression.GzipInflaterInputStreamFactory;
@@ -157,6 +158,7 @@ public abstract class FileInputFormat<OT> extends 
RichInputFormat<OT, FileInputS
         }
     }
 
+    @VisibleForTesting
     public static Set<String> getSupportedCompressionFormats() {
         return INFLATER_INPUT_STREAM_FACTORIES.keySet();
     }
diff --git 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
index 09b4607c548..920e3be52fc 100644
--- 
a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
+++ 
b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java
@@ -18,14 +18,13 @@
 
 package org.apache.flink.api.common.io;
 
-import java.util.Set;
-
 import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics;
 import org.apache.flink.api.common.io.statistics.BaseStatistics;
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FSDataInputStream;
 import org.apache.flink.core.fs.FileInputSplit;
+import org.apache.flink.core.fs.FileStatus;
 import org.apache.flink.core.fs.FileSystem;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.testutils.TestFileUtils;
@@ -42,8 +41,11 @@ import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.List;
+import java.util.Set;
 
 /** Tests for the FileInputFormat */
 public class FileInputFormatTest {
@@ -597,15 +599,16 @@ public class FileInputFormatTest {
     // ---- Tests for compressed files  ---------
 
     /**
-     * Create directory with files with .deflate extension and see if it 
creates a split for each
-     * file. Each split has to start from the beginning.
+     * Create directory with compressed files and see if it creates a split 
for each file. Each
+     * split has to start from the beginning.
      */
     @Test
     public void testFileInputFormatWithCompression() {
         try {
             String tempFile =
-                    TestFileUtils.createTempTextFileDirCompressionFormats(
-                            temporaryFolder.newFolder());
+                    TestFileUtils.createTempFileDirForProvidedFormats(
+                            temporaryFolder.newFolder(),
+                            FileInputFormat.getSupportedCompressionFormats());
             final DummyFileInputFormat format = new DummyFileInputFormat();
             format.setFilePath(tempFile);
             format.configure(new Configuration());
@@ -652,6 +655,61 @@ public class FileInputFormatTest {
         }
     }
 
+    /**
+     * Some FileInputFormats don't use FileInputFormat#createSplits (that 
would detect that the file
+     * is non-splittable and deal with reading boundaries correctly), they all 
create splits
+     * manually from FileSourceSplit. If input files are compressed, ensure 
that the size of the
+     * split is not the compressed file size and that the compression 
decorator is called.
+     */
+    @Test
+    public void testFileInputFormatWithCompressionFromFileSource() {
+        try {
+            String tempFile =
+                    TestFileUtils.createTempFileDirForProvidedFormats(
+                            temporaryFolder.newFolder(),
+                            FileInputFormat.getSupportedCompressionFormats());
+            DummyFileInputFormat format = new DummyFileInputFormat();
+            format.setFilePath(tempFile);
+            format.configure(new Configuration());
+
+            // manually create a FileInputSplit per file as FileSource would do
+            // see 
org.apache.flink.connector.file.table.DeserializationSchemaAdapter.Reader()
+            List<FileInputSplit> splits = manuallyCreateSplits(tempFile);
+            final Set<String> supportedCompressionFormats =
+                    FileInputFormat.getSupportedCompressionFormats();
+            // one file per compression format, one split per file
+            Assert.assertEquals(supportedCompressionFormats.size(), 
splits.size());
+            for (FileInputSplit split : splits) {
+                Assert.assertEquals(0L, split.getStart()); // always read from 
the beginning.
+                format.open(split);
+                Assert.assertTrue(format.compressedRead);
+                Assert.assertEquals(
+                        FileInputFormat.READ_WHOLE_SPLIT_FLAG,
+                        format.getSplitLength()); // unsplittable compressed 
files have this size
+                // as flag for "read whole file"
+            }
+        } catch (Exception ex) {
+            ex.printStackTrace();
+            Assert.fail(ex.getMessage());
+        }
+    }
+
+    /**
+     * Simulates splits created by 
org.apache.flink.connector.file.src.FileSource (one split per
+     * file with length = size of the file). For compressed file, the input 
format should override
+     * it when it detects that the file is unsplittable in {@link
+     * FileInputFormat#open(FileInputSplit)}.
+     */
+    private List<FileInputSplit> manuallyCreateSplits(String pathString) 
throws IOException {
+        List<FileInputSplit> splits = new ArrayList<>();
+        final Path path = new Path(pathString);
+        final FileSystem fs = path.getFileSystem();
+        for (FileStatus file : fs.listStatus(path)) {
+            // split created like in DeserializationSchemaAdapter.Reader()
+            splits.add(new FileInputSplit(0, file.getPath(), 0, file.getLen(), 
null));
+        }
+        return splits;
+    }
     // ------------------------------------------------------------------------
     //  Ignored Files
     // ------------------------------------------------------------------------
@@ -851,8 +909,9 @@ public class FileInputFormatTest {
         }
     }
 
-    private class DummyFileInputFormat extends FileInputFormat<IntValue> {
+    private static class DummyFileInputFormat extends 
FileInputFormat<IntValue> {
         private static final long serialVersionUID = 1L;
+        private boolean compressedRead = false;
 
         @Override
         public boolean reachedEnd() throws IOException {
@@ -863,6 +922,22 @@ public class FileInputFormatTest {
         public IntValue nextRecord(IntValue record) throws IOException {
             return null;
         }
+
+        @Override
+        public void open(FileInputSplit split) throws IOException {
+            compressedRead = false;
+            super.open(split);
+        }
+
+        @Override
+        protected FSDataInputStream decorateInputStream(
+                FSDataInputStream inputStream, FileInputSplit fileSplit) 
throws Throwable {
+            compressedRead =
+                    getInflaterInputStreamFactory(
+                                    
extractFileExtension(fileSplit.getPath().getName()))
+                            != null;
+            return inputStream;
+        }
     }
 
     private class MultiDummyFileInputFormat extends DummyFileInputFormat {
diff --git 
a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java 
b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
index 9dedf0cc37c..df87a653c37 100644
--- a/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
+++ b/flink-core/src/test/java/org/apache/flink/testutils/TestFileUtils.java
@@ -25,8 +25,7 @@ import java.io.FileOutputStream;
 import java.io.FileWriter;
 import java.io.IOException;
 import java.nio.file.Files;
-
-import org.apache.flink.api.common.io.FileInputFormat;
+import java.util.Set;
 
 public class TestFileUtils {
 
@@ -138,7 +137,8 @@ public class TestFileUtils {
         return f.toURI().toString();
     }
 
-    public static String createTempTextFileDirCompressionFormats(File tempDir) 
throws IOException {
+    public static String createTempFileDirForProvidedFormats(File tempDir, 
Set<String> formats)
+            throws IOException {
         File f = null;
         do {
             f = new File(tempDir, randomFileName(FILE_SUFFIX));
@@ -146,7 +146,7 @@ public class TestFileUtils {
         f.mkdirs();
         f.deleteOnExit();
 
-        for (String extension : 
FileInputFormat.getSupportedCompressionFormats()) {
+        for (String extension : formats) {
             File child = new File(f, randomFileName("." + extension));
             child.deleteOnExit();
 

Reply via email to