This is an automated email from the ASF dual-hosted git repository.

gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 02f1695  [FLINK-20221] DelimitedInputFormat does not restore 
compressed splits
02f1695 is described below

commit 02f16958346483d91cd59cb902e0cce63fa5875f
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Nov 18 14:25:05 2020 +0100

    [FLINK-20221] DelimitedInputFormat does not restore compressed splits
    
    Closes #14174
---
 .../flink/api/common/io/DelimitedInputFormat.java  | 54 ++++++++++++------
 .../flink/api/common/io/GenericCsvInputFormat.java | 12 ++--
 .../apache/flink/api/java/io/CsvInputFormat.java   |  4 +-
 .../flink/api/java/io/PojoCsvInputFormat.java      |  4 +-
 .../flink/api/java/io/PrimitiveInputFormat.java    |  4 +-
 .../flink/api/java/io/CsvInputFormatTest.java      | 39 +++++++++++--
 .../flink/api/java/io/RowCsvInputFormatTest.java   |  2 +-
 .../flink/api/java/io/TextInputFormatTest.java     | 64 ++++++++++++++++++++++
 .../flink/test/io/RichInputOutputITCase.java       |  4 +-
 9 files changed, 149 insertions(+), 38 deletions(-)

diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 0c09a4c..a69b5bd 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -32,6 +32,8 @@ import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
@@ -482,6 +484,7 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
                } else {
                        fillBuffer(0);
                }
+               initializeSplit(split, null);
        }
 
        private void initBuffers() {
@@ -738,27 +741,44 @@ public abstract class DelimitedInputFormat<OT> extends 
FileInputFormat<OT> imple
                Preconditions.checkArgument(state == -1 || state >= 
split.getStart(),
                        " Illegal offset "+ state +", smaller than the splits 
start=" + split.getStart());
 
-               try {
+               // If we are already at the end of the split, just return
+               if (split.getLength() != -1 && state > split.getStart() + 
split.getLength()) {
+                       this.end = true;
+                       return;
+               }
+
+               // If the checkpointed offset is at the beginning of the split 
we fall back to the regular
+               // open logic
+               if (split.getStart() == state) {
                        this.open(split);
-               } finally {
-                       this.offset = state;
+                       return;
                }
 
-               if (state > this.splitStart + split.getLength()) {
-                       this.end = true;
-               } else if (state > split.getStart()) {
-                       initBuffers();
+               // Otherwise we have to seek to the checkpointed offset and 
start reading from there
+               super.open(split);
+               this.offset = state;
 
-                       this.stream.seek(this.offset);
-                       if (split.getLength() == -1) {
-                               // this is the case for unsplittable files
-                               fillBuffer(0);
-                       } else {
-                               this.splitLength = this.splitStart + 
split.getLength() - this.offset;
-                               if (splitLength <= 0) {
-                                       this.end = true;
-                               }
-                       }
+               initBuffers();
+
+               this.stream.seek(this.offset);
+               if (split.getLength() == -1) {
+                       // this is the case for unsplittable files
+                       fillBuffer(0);
+               } else {
+                       this.splitLength = this.splitStart + split.getLength() 
- this.offset;
                }
+
+               initializeSplit(split, state);
+       }
+
+       /**
+        * Initialization method that is called after opening or reopening an 
input split.
+        *
+        * @param split Split that was opened or reopened
+        * @param state Checkpointed state if the split was reopened
+        * @throws IOException
+        */
+       protected void initializeSplit(FileInputSplit split, @Nullable Long 
state) throws IOException {
+
        }
 }
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 3b65a42..068890f 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -300,14 +300,14 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
        // 
--------------------------------------------------------------------------------------------
        //  Runtime methods
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
+       protected void initializeSplit(FileInputSplit split, Long offset) 
throws IOException {
+               super.initializeSplit(split, offset);
 
                // instantiate the parsers
                FieldParser<?>[] parsers = new 
FieldParser<?>[fieldTypes.length];
-               
+
                for (int i = 0; i < fieldTypes.length; i++) {
                        if (fieldTypes[i] != null) {
                                Class<? extends FieldParser<?>> parserType = 
FieldParser.getParserForType(fieldTypes[i]);
@@ -330,9 +330,9 @@ public abstract class GenericCsvInputFormat<OT> extends 
DelimitedInputFormat<OT>
                        }
                }
                this.fieldParsers = parsers;
-               
+
                // skip the first line, if we are at the beginning of a file 
and have the option set
-               if (this.skipFirstLineAsHeader && this.splitStart == 0) {
+               if (this.skipFirstLineAsHeader && ((offset == null && 
split.getStart() == 0) || (offset != null && offset == 0))) {
                        readLine(); // read and ignore
                }
        }
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index 84c9af1..df71611 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -50,8 +50,8 @@ public abstract class CsvInputFormat<OUT> extends 
GenericCsvInputFormat<OUT> {
        }
 
        @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
+       protected void initializeSplit(FileInputSplit split, Long offset) 
throws IOException {
+               super.initializeSplit(split, offset);
 
                @SuppressWarnings("unchecked")
                FieldParser<Object>[] fieldParsers = (FieldParser<Object>[]) 
getFieldParsers();
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
index 804d02b..45a59ca 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -156,8 +156,8 @@ public class PojoCsvInputFormat<OUT> extends 
CsvInputFormat<OUT> {
        }
 
        @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
+       public void initializeSplit(FileInputSplit split, Long offset) throws 
IOException {
+               super.initializeSplit(split, offset);
 
                pojoFields = new Field[pojoFieldNames.length];
 
diff --git 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 794703b..018b850 100644
--- 
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++ 
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -56,8 +56,8 @@ public class PrimitiveInputFormat<OT> extends 
DelimitedInputFormat<OT> {
        }
 
        @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
+       protected void initializeSplit(FileInputSplit split, Long offset) 
throws IOException {
+               super.initializeSplit(split, offset);
                Class<? extends FieldParser<OT>> parserType = 
FieldParser.getParserForType(primitiveClass);
                if (parserType == null) {
                        throw new IllegalArgumentException("The type '" + 
primitiveClass.getName() + "' is not supported for the primitive input 
format.");
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 99c569c..97e4f93 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.api.java.io;
 
 import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
@@ -40,9 +41,12 @@ import org.junit.Test;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
 import java.util.List;
 
 import static org.hamcrest.CoreMatchers.is;
@@ -67,15 +71,17 @@ public class CsvInputFormatTest {
 
        @Test
        public void testSplitCsvInputStreamInLargeBuffer() throws Exception {
-               testSplitCsvInputStream(1024 * 1024, false);
+               testSplitCsvInputStream(1024 * 1024, false, false);
+               testSplitCsvInputStream(1024 * 1024, false, true);
        }
 
        @Test
        public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
-               testSplitCsvInputStream(2, false);
+               testSplitCsvInputStream(2, false, false);
+               testSplitCsvInputStream(1024 * 1024, false, true);
        }
 
-       private void testSplitCsvInputStream(int bufferSize, boolean 
failAtStart) throws Exception {
+       private void testSplitCsvInputStream(int bufferSize, boolean 
failAtStart, boolean compressed) throws Exception {
                final String fileContent =
                        "this is|1|2.0|\n" +
                        "a test|3|4.0|\n" +
@@ -83,7 +89,26 @@ public class CsvInputFormatTest {
                        "asdadas|5|30.0|\n";
 
                // create temporary file with 3 blocks
-               final File tempFile = 
File.createTempFile("input-stream-decoration-test", "tmp");
+               final File tempFile;
+
+               if (compressed) {
+                       tempFile = File.createTempFile("TextInputFormatTest", 
".compressed");
+                       TextInputFormat.registerInflaterInputStreamFactory(
+                               "compressed",
+                               new InflaterInputStreamFactory<InputStream>() {
+                                       @Override
+                                       public InputStream create(InputStream 
in) {
+                                               return in;
+                                       }
+
+                                       @Override
+                                       public Collection<String> 
getCommonFileExtensions() {
+                                               return 
Collections.singletonList("compressed");
+                                       }
+                               });
+               } else {
+                       tempFile = 
File.createTempFile("input-stream-decoration-test", ".tmp");
+               }
                tempFile.deleteOnExit();
 
                try (FileOutputStream fileOutputStream = new 
FileOutputStream(tempFile)) {
@@ -110,7 +135,9 @@ public class CsvInputFormatTest {
                Tuple3<String, Integer, Double> result = new Tuple3<>();
 
                for (FileInputSplit inputSplit : inputSplits) {
-                       assertEquals(inputSplit.getStart() + 
inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
+                       if (!compressed) {
+                               assertEquals(inputSplit.getStart() + 
inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
+                       }
                        splitCounter++;
 
                        format.open(inputSplit);
@@ -369,8 +396,8 @@ public class CsvInputFormatTest {
                        final CsvInputFormat<Tuple3<String, String, String>> 
format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", 
"|", typeInfo);
 
                        final Configuration parameters = new Configuration();
-                       format.configure(parameters);
                        format.enableQuotedStringParsing('@');
+                       format.configure(parameters);
                        format.open(split);
 
                        Tuple3<String, String, String> result = new 
Tuple3<String, String, String>();
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index 53ed59d..b4aab32 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -290,8 +290,8 @@ public class RowCsvInputFormatTest {
                        BasicTypeInfo.STRING_TYPE_INFO};
 
                RowCsvInputFormat format = new RowCsvInputFormat(PATH, 
fieldTypes, "\n", "|");
-               format.configure(new Configuration());
                format.enableQuotedStringParsing('@');
+               format.configure(new Configuration());
                format.open(split);
 
                Row result = new Row(3);
diff --git 
a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
 
b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index 676fd33..65c6397 100644
--- 
a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++ 
b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.api.java.io;
 
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.FileSystem;
@@ -31,9 +32,11 @@ import org.junit.rules.TemporaryFolder;
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.InputStream;
 import java.io.OutputStreamWriter;
 import java.io.PrintStream;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 
@@ -195,4 +198,65 @@ public class TextInputFormatTest extends TestLogger {
                }
        }
 
+       @Test
+       public void testCompressedRead() throws IOException {
+               TextInputFormat.registerInflaterInputStreamFactory(
+                       "compressed",
+                       new InflaterInputStreamFactory<InputStream>() {
+                               @Override
+                               public InputStream create(InputStream in) {
+                                       return in;
+                               }
+
+                               @Override
+                               public Collection<String> 
getCommonFileExtensions() {
+                                       return 
Collections.singletonList("compressed");
+                               }
+                       });
+
+               final String first = "First line";
+               final String second = "Second line";
+
+               // create input file
+               File tempFile = File.createTempFile("TextInputFormatTest", 
".compressed", temporaryFolder.getRoot());
+               tempFile.setWritable(true);
+
+               try (PrintStream ps = new PrintStream(tempFile)) {
+                       ps.println(first);
+                       ps.println(second);
+               }
+
+               TextInputFormat inputFormat = new TextInputFormat(new 
Path(tempFile.toURI().toString()));
+               Configuration parameters = new Configuration();
+               inputFormat.configure(parameters);
+
+               FileInputSplit[] splits = inputFormat.createInputSplits(1);
+               assertThat("expected at least one input split", splits.length, 
greaterThanOrEqualTo(1));
+
+               inputFormat.open(splits[0]);
+               try {
+                       assertFalse(inputFormat.reachedEnd());
+                       String result = inputFormat.nextRecord("");
+                       assertNotNull("Expecting first record here", result);
+                       assertEquals(first, result);
+                       assertFalse(inputFormat.reachedEnd());
+
+                       Long currentOffset = inputFormat.getCurrentState();
+                       inputFormat.close();
+
+                       inputFormat = new TextInputFormat(new 
Path(tempFile.toURI().toString()));
+                       inputFormat.configure(parameters);
+                       inputFormat.reopen(splits[0], currentOffset);
+
+                       assertFalse(inputFormat.reachedEnd());
+                       result = inputFormat.nextRecord(result);
+                       assertNotNull("Expecting second record here", result);
+                       assertEquals(second, result);
+
+                       assertTrue(inputFormat.reachedEnd() || null == 
inputFormat.nextRecord(result));
+               } finally {
+                       inputFormat.close();
+               }
+       }
+
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java 
b/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
index 4f25bad..270a66b 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
@@ -78,13 +78,13 @@ public class RichInputOutputITCase extends 
JavaProgramTestBase {
                }
 
                @Override
-               public void open(FileInputSplit split) throws IOException{
+               public void initializeSplit(FileInputSplit split, Long offset) 
throws IOException {
                        try {
                                
getRuntimeContext().addAccumulator("DATA_SOURCE_ACCUMULATOR", counter);
                        } catch (UnsupportedOperationException e){
                                // the accumulator is already added
                        }
-                       super.open(split);
+                       super.initializeSplit(split, offset);
                }
 
                @Override

Reply via email to