[
https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17645326#comment-17645326
]
Dmitry Yaraev edited comment on FLINK-30314 at 12/9/22 2:44 PM:
----------------------------------------------------------------
??We would need to uncompress them to a resilient distributed storage to have
the proper size and be able to split.??
Do we really have to do this? Won't it bring additional overhead? I would say
that we could consider such files non-splittable and parallelize jobs on the
file level. We cannot parallelize decompression of text files, so it will
anyway be done on a single worker. If we read from the compressed stream until
end, we will not need uncompressed file size. As soon as files are decompressed
we can re-distribute the data using *{{rebalance}}* or *{{shuffle}}* operations.
was (Author: dyaraev):
??We would need to uncompress them to a resilient distributed storage to have
the proper size and be able to split.??
Do we really have to do this? Won't it bring additional overhead? I would say
that we could consider such files non-splittable and parallelize jobs on the
file level. We cannot parallelize decompression, so it will anyway be done on a
single worker. If we read from the compressed stream until end, we will not
need uncompressed file size. As soon as files are decompressed we can
re-distribute the data using *{{rebalance}}* or *{{shuffle}}* operations.
> Unable to read all records from compressed line-delimited JSON files using
> Table API
> ------------------------------------------------------------------------------------
>
> Key: FLINK-30314
> URL: https://issues.apache.org/jira/browse/FLINK-30314
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.15.2
> Reporter: Dmitry Yaraev
> Priority: Major
>
> I am reading gzipped JSON line-delimited files in the batch mode using
> [FileSystem
> Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/].
> For reading the files a new table is created with the following
> configuration:
> {code:sql}
> CREATE TEMPORARY TABLE `my_database`.`my_table` (
> `my_field1` BIGINT,
> `my_field2` INT,
> `my_field3` VARCHAR(2147483647)
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'path-to-input-dir',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'false',
> 'json.fail-on-missing-field' = 'true'
> ) {code}
> In the input directory I have two files: input-00000.json.gz and
> input-00001.json.gz. As it comes from the filenames, the files are compressed
> with GZIP. Each of the files contains 10 records. The issue is that only 2
> records from each file are read (4 in total). If decompressed versions of the
> same data files are used, all 20 records are read.
> As far as I understand, that problem may be related to the fact that split
> length, which is used when the files are read, is in fact the length of a
> compressed file. So files are closed before all records are read from them
> because read position of the decompressed file stream exceeds split length.
> Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we
> could identify if the file compressed or not. The flag can be set to true in
> {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file
> streams. With such a flag it could be possible to differentiate
> non-splittable compressed files and only rely on the end of the stream.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)