twalthr commented on a change in pull request #14174:
URL: https://github.com/apache/flink/pull/14174#discussion_r530903538
##########
File path:
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
##########
@@ -369,8 +396,8 @@ public void readMixedQuotedStringFields() {
final CsvInputFormat<Tuple3<String, String, String>>
format = new TupleCsvInputFormat<Tuple3<String, String, String>>(PATH, "\n",
"|", typeInfo);
final Configuration parameters = new Configuration();
- format.configure(parameters);
format.enableQuotedStringParsing('@');
+ format.configure(parameters);
Review comment:
can you explain this change?
The docs say `This method is always called first on a newly instantiated
input format. `.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
##########
@@ -738,15 +739,18 @@ public void reopen(FileInputSplit split, Long state)
throws IOException {
Preconditions.checkArgument(state == -1 || state >=
split.getStart(),
" Illegal offset "+ state +", smaller than the splits
start=" + split.getStart());
- try {
+ // If we checkpointed at the beginning of split simply call open
+ if (split.getStart() == state) {
this.open(split);
- } finally {
- this.offset = state;
+ return;
}
- if (state > this.splitStart + split.getLength()) {
+ super.open(split);
Review comment:
what is the main difference between `this.open()` and `super.open()`?
maybe we should give this logic a name and call it explicitly here
##########
File path:
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
##########
@@ -195,4 +198,65 @@ private void testRemovingTrailingCR(String lineBreaker,
String delimiter) throws
}
}
+ @Test
+ public void testCompressedRead() throws IOException {
+ final String first = "First line";
+ final String second = "Second line";
+
+ // create input file
+ File tempFile = File.createTempFile("TextInputFormatTest",
".compressed", temporaryFolder.getRoot());
+ tempFile.setWritable(true);
+
+ try (PrintStream ps = new PrintStream(tempFile)) {
+ ps.println(first);
+ ps.println(second);
+ }
+
+ TextInputFormat inputFormat = new TextInputFormat(new
Path(tempFile.toURI().toString()));
+ TextInputFormat.registerInflaterInputStreamFactory(
+ "compressed",
+ new InflaterInputStreamFactory<InputStream>() {
+ @Override
+ public InputStream create(InputStream in)
throws IOException {
Review comment:
nit: remove unused exception here and in other tests
##########
File path:
flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java
##########
@@ -195,4 +198,65 @@ private void testRemovingTrailingCR(String lineBreaker,
String delimiter) throws
}
}
+ @Test
+ public void testCompressedRead() throws IOException {
+ final String first = "First line";
+ final String second = "Second line";
+
+ // create input file
+ File tempFile = File.createTempFile("TextInputFormatTest",
".compressed", temporaryFolder.getRoot());
+ tempFile.setWritable(true);
+
+ try (PrintStream ps = new PrintStream(tempFile)) {
+ ps.println(first);
+ ps.println(second);
+ }
+
+ TextInputFormat inputFormat = new TextInputFormat(new
Path(tempFile.toURI().toString()));
+ TextInputFormat.registerInflaterInputStreamFactory(
Review comment:
put this to the beginning of the test and make sure to remove the
modification again after test completion
##########
File path:
flink-java/src/test/java/org/apache/flink/api/java/io/CsvInputFormatTest.java
##########
@@ -67,24 +71,45 @@
@Test
public void testSplitCsvInputStreamInLargeBuffer() throws Exception {
- testSplitCsvInputStream(1024 * 1024, false);
+ testSplitCsvInputStream(1024 * 1024, false, false);
+ testSplitCsvInputStream(1024 * 1024, false, true);
}
@Test
public void testSplitCsvInputStreamInSmallBuffer() throws Exception {
- testSplitCsvInputStream(2, false);
+ testSplitCsvInputStream(2, false, false);
+ testSplitCsvInputStream(1024 * 1024, false, true);
}
- private void testSplitCsvInputStream(int bufferSize, boolean
failAtStart) throws Exception {
+ private void testSplitCsvInputStream(int bufferSize, boolean
failAtStart, boolean compressed) throws Exception {
final String fileContent =
"this is|1|2.0|\n" +
"a test|3|4.0|\n" +
"#next|5|6.0|\n" +
"asdadas|5|30.0|\n";
// create temporary file with 3 blocks
- final File tempFile =
File.createTempFile("input-stream-decoration-test", "tmp");
- tempFile.deleteOnExit();
+ final File tempFile;
+
+ if (compressed) {
+ tempFile = File.createTempFile("TextInputFormatTest",
".compressed");
+ TextInputFormat.registerInflaterInputStreamFactory(
+ "compressed",
+ new InflaterInputStreamFactory<InputStream>() {
+ @Override
+ public InputStream create(InputStream
in) throws IOException {
+ return in;
+ }
+
+ @Override
+ public Collection<String>
getCommonFileExtensions() {
+ return
Collections.singletonList("compressed");
+ }
+ });
+ } else {
+ tempFile =
File.createTempFile("input-stream-decoration-test", ".tmp");
+ tempFile.deleteOnExit();
Review comment:
nit: remove this line or also delete the other file?
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
##########
@@ -755,10 +759,13 @@ public void reopen(FileInputSplit split, Long state)
throws IOException {
fillBuffer(0);
} else {
this.splitLength = this.splitStart +
split.getLength() - this.offset;
- if (splitLength <= 0) {
- this.end = true;
- }
}
}
+
+ initializeSplit(split, state);
Review comment:
do we need to initialize a split in case it has ended?
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/io/GenericCsvInputFormat.java
##########
@@ -300,14 +300,14 @@ protected void setFieldsGeneric(boolean[] includedMask,
Class<?>[] fieldTypes) {
//
--------------------------------------------------------------------------------------------
// Runtime methods
//
--------------------------------------------------------------------------------------------
-
+
@Override
- public void open(FileInputSplit split) throws IOException {
- super.open(split);
+ public void initializeSplit(FileInputSplit split, Long offset) throws
IOException {
+ super.initializeSplit(split, offset);
// instantiate the parsers
FieldParser<?>[] parsers = new
FieldParser<?>[fieldTypes.length];
Review comment:
Move this logic to the open method? It is not really related to the
split initialization itself.
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
##########
@@ -755,10 +759,13 @@ public void reopen(FileInputSplit split, Long state)
throws IOException {
fillBuffer(0);
} else {
this.splitLength = this.splitStart +
split.getLength() - this.offset;
- if (splitLength <= 0) {
- this.end = true;
- }
}
}
+
+ initializeSplit(split, state);
+ }
+
+ public void initializeSplit(FileInputSplit split, Long state) throws
IOException {
Review comment:
add a `@Nullable` annotation to offset
##########
File path:
flink-core/src/main/java/org/apache/flink/api/common/io/DelimitedInputFormat.java
##########
@@ -755,10 +759,13 @@ public void reopen(FileInputSplit split, Long state)
throws IOException {
fillBuffer(0);
} else {
this.splitLength = this.splitStart +
split.getLength() - this.offset;
- if (splitLength <= 0) {
- this.end = true;
- }
}
}
+
+ initializeSplit(split, state);
+ }
+
+ public void initializeSplit(FileInputSplit split, Long state) throws
IOException {
Review comment:
make it protected and document the semantics?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]