twalthr commented on a change in pull request #14174:
URL: https://github.com/apache/flink/pull/14174#discussion_r530903538



##########
File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
##########
@@ -369,8 +396,8 @@ public void readMixedQuotedStringFields() {
                        final CsvInputFormat<Tuple3<String, String, String>> 
format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, "\n", 
"|", typeInfo);
 
                        final Configuration parameters = new Configuration();
-                       format.configure(parameters);
                        format.enableQuotedStringParsing('@');
+                       format.configure(parameters);

Review comment:
       can you explain this change?
   The docs say `This method is always called first on a newly instantiated 
input format. `.

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
##########
@@ -738,15 +739,18 @@ public void reopen(FileInputSplit split, Long state) 
throws IOException {
                Preconditions.checkArgument(state == -1 || state >= 
split.getStart(),
                        " Illegal offset "+ state +", smaller than the splits 
start=" + split.getStart());
 
-               try {
+               // If we checkpointed at the beginning of split simply call open
+               if (split.getStart() == state) {
                        this.open(split);
-               } finally {
-                       this.offset = state;
+                       return;
                }
 
-               if (state > this.splitStart + split.getLength()) {
+               super.open(split);

Review comment:
       what is the main difference between `this.open()` and `super.open()`?
   maybe we should give this logic a name and call it explicitly here

##########
File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
##########
@@ -195,4 +198,65 @@ private void testRemovingTrailingCR(String lineBreaker, 
String delimiter) throws
                }
        }
 
+       @Test
+       public void testCompressedRead() throws IOException {
+               final String first = "First line";
+               final String second = "Second line";
+
+               // create input file
+               File tempFile = File.createTempFile("TextInputFormatTest", 
".compressed", temporaryFolder.getRoot());
+               tempFile.setWritable(true);
+
+               try (PrintStream ps = new PrintStream(tempFile)) {
+                       ps.println(first);
+                       ps.println(second);
+               }
+
+               TextInputFormat inputFormat = new TextInputFormat(new 
Path(tempFile.toURI().toString()));
+               TextInputFormat.registerInflaterInputStreamFactory(
+                       "compressed",
+                       new InflaterInputStreamFactory<InputStream>() {
+                               @Override
+                               public InputStream create(InputStream in) 
throws IOException {

Review comment:
       nit: remove unused exception here and in other tests

##########
File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
##########
@@ -195,4 +198,65 @@ private void testRemovingTrailingCR(String lineBreaker, 
String delimiter) throws
                }
        }
 
+       @Test
+       public void testCompressedRead() throws IOException {
+               final String first = "First line";
+               final String second = "Second line";
+
+               // create input file
+               File tempFile = File.createTempFile("TextInputFormatTest", 
".compressed", temporaryFolder.getRoot());
+               tempFile.setWritable(true);
+
+               try (PrintStream ps = new PrintStream(tempFile)) {
+                       ps.println(first);
+                       ps.println(second);
+               }
+
+               TextInputFormat inputFormat = new TextInputFormat(new 
Path(tempFile.toURI().toString()));
+               TextInputFormat.registerInflaterInputStreamFactory(

Review comment:
       put this to the beginning of the test and make sure to remove the 
modification again after test completion

##########
File path: 
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
##########
@@ -67,24 +71,45 @@
 
        @Test
        public void testSplitCsvInputStreamInLargeBuffer() throws Exception {
-               testSplitCsvInputStream(1024 * 1024, false);
+               testSplitCsvInputStream(1024 * 1024, false, false);
+               testSplitCsvInputStream(1024 * 1024, false, true);
        }
 
        @Test
        public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
-               testSplitCsvInputStream(2, false);
+               testSplitCsvInputStream(2, false, false);
+               testSplitCsvInputStream(1024 * 1024, false, true);
        }
 
-       private void testSplitCsvInputStream(int bufferSize, boolean 
failAtStart) throws Exception {
+       private void testSplitCsvInputStream(int bufferSize, boolean 
failAtStart, boolean compressed) throws Exception {
                final String fileContent =
                        "this is|1|2.0|\n" +
                        "a test|3|4.0|\n" +
                        "#next|5|6.0|\n" +
                        "asdadas|5|30.0|\n";
 
                // create temporary file with 3 blocks
-               final File tempFile = 
File.createTempFile("input-stream-decoration-test", "tmp");
-               tempFile.deleteOnExit();
+               final File tempFile;
+
+               if (compressed) {
+                       tempFile = File.createTempFile("TextInputFormatTest", 
".compressed");
+                       TextInputFormat.registerInflaterInputStreamFactory(
+                               "compressed",
+                               new InflaterInputStreamFactory<InputStream>() {
+                                       @Override
+                                       public InputStream create(InputStream 
in) throws IOException {
+                                               return in;
+                                       }
+
+                                       @Override
+                                       public Collection<String> 
getCommonFileExtensions() {
+                                               return 
Collections.singletonList("compressed");
+                                       }
+                               });
+               } else {
+                       tempFile = 
File.createTempFile("input-stream-decoration-test", ".tmp");
+                       tempFile.deleteOnExit();

Review comment:
       nit: remove this line or also delete the other file?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
##########
@@ -755,10 +759,13 @@ public void reopen(FileInputSplit split, Long state) 
throws IOException {
                                fillBuffer(0);
                        } else {
                                this.splitLength = this.splitStart + 
split.getLength() - this.offset;
-                               if (splitLength <= 0) {
-                                       this.end = true;
-                               }
                        }
                }
+
+               initializeSplit(split, state);

Review comment:
       do we need to initialize a split in case it has ended?

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
##########
@@ -300,14 +300,14 @@ protected void setFieldsGeneric(boolean[] includedMask, 
Class<?>[] fieldTypes) {
        // 
--------------------------------------------------------------------------------------------
        //  Runtime methods
        // 
--------------------------------------------------------------------------------------------
-       
+
        @Override
-       public void open(FileInputSplit split) throws IOException {
-               super.open(split);
+       public void initializeSplit(FileInputSplit split, Long offset) throws 
IOException {
+               super.initializeSplit(split, offset);
 
                // instantiate the parsers
                FieldParser<?>[] parsers = new 
FieldParser<?>[fieldTypes.length];

Review comment:
       Move this logic to the open method? It is not really related to the 
split initialization itself. 

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
##########
@@ -755,10 +759,13 @@ public void reopen(FileInputSplit split, Long state) 
throws IOException {
                                fillBuffer(0);
                        } else {
                                this.splitLength = this.splitStart + 
split.getLength() - this.offset;
-                               if (splitLength <= 0) {
-                                       this.end = true;
-                               }
                        }
                }
+
+               initializeSplit(split, state);
+       }
+
+       public void initializeSplit(FileInputSplit split, Long state) throws 
IOException {

Review comment:
       add a `@Nullable` annotation to offset

##########
File path: 
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
##########
@@ -755,10 +759,13 @@ public void reopen(FileInputSplit split, Long state) 
throws IOException {
                                fillBuffer(0);
                        } else {
                                this.splitLength = this.splitStart + 
split.getLength() - this.offset;
-                               if (splitLength <= 0) {
-                                       this.end = true;
-                               }
                        }
                }
+
+               initializeSplit(split, state);
+       }
+
+       public void initializeSplit(FileInputSplit split, Long state) throws 
IOException {

Review comment:
       make it protected and document the semantics?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to