This is an automated email from the ASF dual-hosted git repository.
gyfora pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 02f1695 [FLINK-20221] DelimitedInputFormat does not restore
compressed splits
02f1695 is described below
commit 02f16958346483d91cd59cb902e0cce63fa5875f
Author: Gyula Fora <[email protected]>
AuthorDate: Wed Nov 18 14:25:05 2020 +0100
[FLINK-20221] DelimitedInputFormat does not restore compressed splits
Closes #14174
---
.../flink/api/common/io/DelimitedInputFormat.java | 54 ++++++++++++------
.../flink/api/common/io/GenericCsvInputFormat.java | 12 ++--
.../apache/flink/api/java/io/CsvInputFormat.java | 4 +-
.../flink/api/java/io/PojoCsvInputFormat.java | 4 +-
.../flink/api/java/io/PrimitiveInputFormat.java | 4 +-
.../flink/api/java/io/CsvInputFormatTest.java | 39 +++++++++++--
.../flink/api/java/io/RowCsvInputFormatTest.java | 2 +-
.../flink/api/java/io/TextInputFormatTest.java | 64 ++++++++++++++++++++++
.../flink/test/io/RichInputOutputITCase.java | 4 +-
9 files changed, 149 insertions(+), 38 deletions(-)
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
index 0c09a4c..a69b5bd 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
@@ -32,6 +32,8 @@ import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.nio.charset.Charset;
import java.util.ArrayList;
@@ -482,6 +484,7 @@ public abstract class DelimitedInputFormat<OT> extends
FileInputFormat<OT> imple
} else {
fillBuffer(0);
}
+ initializeSplit(split, null);
}
private void initBuffers() {
@@ -738,27 +741,44 @@ public abstract class DelimitedInputFormat<OT> extends
FileInputFormat<OT> imple
Preconditions.checkArgument(state == -1 || state >=
split.getStart(),
" Illegal offset "+ state +", smaller than the splits
start=" + split.getStart());
- try {
+ // If we are already at the end of the split, just return
+ if (split.getLength() != -1 && state > split.getStart() +
split.getLength()) {
+ this.end = true;
+ return;
+ }
+
+ // If the checkpointed offset is at the beginning of the split
we fall back to the regular
+ // open logic
+ if (split.getStart() == state) {
this.open(split);
- } finally {
- this.offset = state;
+ return;
}
- if (state > this.splitStart + split.getLength()) {
- this.end = true;
- } else if (state > split.getStart()) {
- initBuffers();
+ // Otherwise we have to seek to the checkpointed offset and
start reading from there
+ super.open(split);
+ this.offset = state;
- this.stream.seek(this.offset);
- if (split.getLength() == -1) {
- // this is the case for unsplittable files
- fillBuffer(0);
- } else {
- this.splitLength = this.splitStart +
split.getLength() - this.offset;
- if (splitLength <= 0) {
- this.end = true;
- }
- }
+ initBuffers();
+
+ this.stream.seek(this.offset);
+ if (split.getLength() == -1) {
+ // this is the case for unsplittable files
+ fillBuffer(0);
+ } else {
+ this.splitLength = this.splitStart + split.getLength()
- this.offset;
}
+
+ initializeSplit(split, state);
+ }
+
+ /**
+ * Initialization method that is called after opening or reopening an
input split.
+ *
+ * @param split Split that was opened or reopened
+ * @param state Checkpointed state if the split was reopened
+ * @throws IOException
+ */
+ protected void initializeSplit(FileInputSplit split, @Nullable Long
state) throws IOException {
+
}
}
diff --git
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
index 3b65a42..068890f 100644
---
a/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
+++
b/flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
@@ -300,14 +300,14 @@ public abstract class GenericCsvInputFormat<OT> extends
DelimitedInputFormat<OT>
//
--------------------------------------------------------------------------------------------
// Runtime methods
//
--------------------------------------------------------------------------------------------
-
+
@Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
+ protected void initializeSplit(FileInputSplit split, Long offset)
throws IOException {
+ super.initializeSplit(split, offset);
// instantiate the parsers
FieldParser<?>[] parsers = new
FieldParser<?>[fieldTypes.length];
-
+
for (int i = 0; i < fieldTypes.length; i++) {
if (fieldTypes[i] != null) {
Class<? extends FieldParser<?>> parserType =
FieldParser.getParserForType(fieldTypes[i]);
@@ -330,9 +330,9 @@ public abstract class GenericCsvInputFormat<OT> extends
DelimitedInputFormat<OT>
}
}
this.fieldParsers = parsers;
-
+
// skip the first line, if we are at the beginning of a file
and have the option set
- if (this.skipFirstLineAsHeader && this.splitStart == 0) {
+ if (this.skipFirstLineAsHeader && ((offset == null &&
split.getStart() == 0) || (offset != null && offset == 0))) {
readLine(); // read and ignore
}
}
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
index 84c9af1..df71611 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/io/CsvInputFormat.java
@@ -50,8 +50,8 @@ public abstract class CsvInputFormat<OUT> extends
GenericCsvInputFormat<OUT> {
}
@Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
+ protected void initializeSplit(FileInputSplit split, Long offset)
throws IOException {
+ super.initializeSplit(split, offset);
@SuppressWarnings("unchecked")
FieldParser<Object>[] fieldParsers = (FieldParser<Object>[])
getFieldParsers();
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
index 804d02b..45a59ca 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/io/PojoCsvInputFormat.java
@@ -156,8 +156,8 @@ public class PojoCsvInputFormat<OUT> extends
CsvInputFormat<OUT> {
}
@Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
+ public void initializeSplit(FileInputSplit split, Long offset) throws
IOException {
+ super.initializeSplit(split, offset);
pojoFields = new Field[pojoFieldNames.length];
diff --git
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
index 794703b..018b850 100644
---
a/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
+++
b/flink-java/src/main/java/org/apache/flink/api/java/io/PrimitiveInputFormat.java
@@ -56,8 +56,8 @@ public class PrimitiveInputFormat<OT> extends
DelimitedInputFormat<OT> {
}
@Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
+ protected void initializeSplit(FileInputSplit split, Long offset)
throws IOException {
+ super.initializeSplit(split, offset);
Class<? extends FieldParser<OT>> parserType =
FieldParser.getParserForType(primitiveClass);
if (parserType == null) {
throw new IllegalArgumentException("The type '" +
primitiveClass.getName() + "' is not supported for the primitive input
format.");
diff --git
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
index 99c569c..97e4f93 100644
---
a/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
+++
b/flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
@@ -19,6 +19,7 @@
package org.apache.flink.api.java.io;
import org.apache.flink.api.common.io.ParseException;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
@@ -40,9 +41,12 @@ import org.junit.Test;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
import java.util.List;
import static org.hamcrest.CoreMatchers.is;
@@ -67,15 +71,17 @@ public class CsvInputFormatTest {
@Test
public void testSplitCsvInputStreamInLargeBuffer() throws Exception {
- testSplitCsvInputStream(1024 * 1024, false);
+ testSplitCsvInputStream(1024 * 1024, false, false);
+ testSplitCsvInputStream(1024 * 1024, false, true);
}
@Test
public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
- testSplitCsvInputStream(2, false);
+ testSplitCsvInputStream(2, false, false);
+ testSplitCsvInputStream(1024 * 1024, false, true);
}
- private void testSplitCsvInputStream(int bufferSize, boolean
failAtStart) throws Exception {
+ private void testSplitCsvInputStream(int bufferSize, boolean
failAtStart, boolean compressed) throws Exception {
final String fileContent =
"this is|1|2.0|\n" +
"a test|3|4.0|\n" +
@@ -83,7 +89,26 @@ public class CsvInputFormatTest {
"asdadas|5|30.0|\n";
// create temporary file with 3 blocks
- final File tempFile =
File.createTempFile("input-stream-decoration-test", "tmp");
+ final File tempFile;
+
+ if (compressed) {
+ tempFile = File.createTempFile("TextInputFormatTest",
".compressed");
+ TextInputFormat.registerInflaterInputStreamFactory(
+ "compressed",
+ new InflaterInputStreamFactory<InputStream>() {
+ @Override
+ public InputStream create(InputStream
in) {
+ return in;
+ }
+
+ @Override
+ public Collection<String>
getCommonFileExtensions() {
+ return
Collections.singletonList("compressed");
+ }
+ });
+ } else {
+ tempFile =
File.createTempFile("input-stream-decoration-test", ".tmp");
+ }
tempFile.deleteOnExit();
try (FileOutputStream fileOutputStream = new
FileOutputStream(tempFile)) {
@@ -110,7 +135,9 @@ public class CsvInputFormatTest {
Tuple3<String, Integer, Double> result = new Tuple3<>();
for (FileInputSplit inputSplit : inputSplits) {
- assertEquals(inputSplit.getStart() +
inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
+ if (!compressed) {
+ assertEquals(inputSplit.getStart() +
inputSplit.getLength(), offsetAtEndOfSplit[splitCounter]);
+ }
splitCounter++;
format.open(inputSplit);
@@ -369,8 +396,8 @@ public class CsvInputFormatTest {
final CsvInputFormat<Tuple3<String, String, String>>
format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, "\n",
"|", typeInfo);
final Configuration parameters = new Configuration();
- format.configure(parameters);
format.enableQuotedStringParsing('@');
+ format.configure(parameters);
format.open(split);
Tuple3<String, String, String> result = new
Tuple3<String, String, String>();
diff --git
a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
index 53ed59d..b4aab32 100644
---
a/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
+++
b/flink-java/src/test/java/org/apache/flink/api/java/io/RowCsvInputFormatTest.java
@@ -290,8 +290,8 @@ public class RowCsvInputFormatTest {
BasicTypeInfo.STRING_TYPE_INFO};
RowCsvInputFormat format = new RowCsvInputFormat(PATH,
fieldTypes, "\n", "|");
- format.configure(new Configuration());
format.enableQuotedStringParsing('@');
+ format.configure(new Configuration());
format.open(split);
Row result = new Row(3);
diff --git
a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
index 676fd33..65c6397 100644
---
a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
+++
b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.api.java.io;
+import org.apache.flink.api.common.io.compression.InflaterInputStreamFactory;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileInputSplit;
import org.apache.flink.core.fs.FileSystem;
@@ -31,9 +32,11 @@ import org.junit.rules.TemporaryFolder;
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.InputStream;
import java.io.OutputStreamWriter;
import java.io.PrintStream;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
@@ -195,4 +198,65 @@ public class TextInputFormatTest extends TestLogger {
}
}
+ @Test
+ public void testCompressedRead() throws IOException {
+ TextInputFormat.registerInflaterInputStreamFactory(
+ "compressed",
+ new InflaterInputStreamFactory<InputStream>() {
+ @Override
+ public InputStream create(InputStream in) {
+ return in;
+ }
+
+ @Override
+ public Collection<String>
getCommonFileExtensions() {
+ return
Collections.singletonList("compressed");
+ }
+ });
+
+ final String first = "First line";
+ final String second = "Second line";
+
+ // create input file
+ File tempFile = File.createTempFile("TextInputFormatTest",
".compressed", temporaryFolder.getRoot());
+ tempFile.setWritable(true);
+
+ try (PrintStream ps = new PrintStream(tempFile)) {
+ ps.println(first);
+ ps.println(second);
+ }
+
+ TextInputFormat inputFormat = new TextInputFormat(new
Path(tempFile.toURI().toString()));
+ Configuration parameters = new Configuration();
+ inputFormat.configure(parameters);
+
+ FileInputSplit[] splits = inputFormat.createInputSplits(1);
+ assertThat("expected at least one input split", splits.length,
greaterThanOrEqualTo(1));
+
+ inputFormat.open(splits[0]);
+ try {
+ assertFalse(inputFormat.reachedEnd());
+ String result = inputFormat.nextRecord("");
+ assertNotNull("Expecting first record here", result);
+ assertEquals(first, result);
+ assertFalse(inputFormat.reachedEnd());
+
+ Long currentOffset = inputFormat.getCurrentState();
+ inputFormat.close();
+
+ inputFormat = new TextInputFormat(new
Path(tempFile.toURI().toString()));
+ inputFormat.configure(parameters);
+ inputFormat.reopen(splits[0], currentOffset);
+
+ assertFalse(inputFormat.reachedEnd());
+ result = inputFormat.nextRecord(result);
+ assertNotNull("Expecting second record here", result);
+ assertEquals(second, result);
+
+ assertTrue(inputFormat.reachedEnd() || null ==
inputFormat.nextRecord(result));
+ } finally {
+ inputFormat.close();
+ }
+ }
+
}
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
index 4f25bad..270a66b 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/io/RichInputOutputITCase.java
@@ -78,13 +78,13 @@ public class RichInputOutputITCase extends
JavaProgramTestBase {
}
@Override
- public void open(FileInputSplit split) throws IOException{
+ public void initializeSplit(FileInputSplit split, Long offset)
throws IOException {
try {
getRuntimeContext().addAccumulator("DATA_SOURCE_ACCUMULATOR", counter);
} catch (UnsupportedOperationException e){
// the accumulator is already added
}
- super.open(split);
+ super.initializeSplit(split, offset);
}
@Override