AHeise commented on code in PR #18299:
URL: https://github.com/apache/flink/pull/18299#discussion_r857500589
##########
flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java:
##########
@@ -899,6 +899,8 @@ protected FSDataInputStream decorateInputStream(
InflaterInputStreamFactory<?> inflaterInputStreamFactory =
getInflaterInputStreamFactory(fileSplit.getPath());
if (inflaterInputStreamFactory != null) {
+ // compressed format should use splitLength specially
+ this.splitLength = -1;
Review Comment:
This fix looks odd to me. First, we are modifying a parameter, which is
always a sign that this should go to call site. Second, here at this point, I
cannot see that we are guaranteed to not have 2 splits on the same file and
reading duplicate data if we simply change splitLength here. Third, this should
probably use `READ_WHOLE_SPLIT_FLAG`.
All in all the proper place to fix it is in `createInputSplits`.
The actual bug is in
```
if (unsplittable) { // should be testForUnsplittable(file)
int splitNum = 0;
for (final FileStatus file : files) {
final FileSystem fs = file.getPath().getFileSystem();
final BlockLocation[] blocks =
fs.getFileBlockLocations(file, 0, file.getLen());
Set<String> hosts = new HashSet<String>();
for (BlockLocation block : blocks) {
hosts.addAll(Arrays.asList(block.getHosts()));
}
long len = file.getLen();
if (testForUnsplittable(file)) { // this doesn't make any
sense at this point
len = READ_WHOLE_SPLIT_FLAG;
}
FileInputSplit fis =
new FileInputSplit(
splitNum++,
file.getPath(),
0,
len,
hosts.toArray(new String[hosts.size()]));
inputSplits.add(fis);
}
return inputSplits.toArray(new
FileInputSplit[inputSplits.size()]);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]