[
https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648827#comment-17648827
]
Dmitry Yaraev edited comment on FLINK-30314 at 12/16/22 11:22 PM:
------------------------------------------------------------------
[~echauchot] Thanks. I fixed logging and now I can see the exception which is
thrown when zipped JSON is read. The only problem is that the application
itself doesn't fail. There are exceptions in the log but the application still
exits with code 0, which is strange. Another confusing thing is that the
exception says nothing about unsupported formats because it is an internal
exception of the Jackson parser. I would say that in both cases (gz and zip) an
appropriate error with an informative message about unsupported file format
should be thrown.
Also, I checked why it comes to the JSON parser without trying to decompress
the stream and found that ZIP is not on the list of supported compression
codecs (can be found in debug mode on [this
line|https://github.com/apache/flink/blob/af6eff873a53bbdc85a2b1018140754e65758e3e/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L153]).
The list includes the following formats:
* xz
* deflate
* gz
* gzip
* zst
* bz2
As we can see, it shouldn't even try to read this ZIP file because it's not
supported. But it does try to read it uncompressed and even decides that the
file should be processed by the Jackson parser. Probably there is one more bug
here.
was (Author: dyaraev):
[~echauchot] Thanks. I fixed logging and now I can see the exception which is
thrown when zipped JSON is read. The only problem is that the application
itself doesn't fail. There are exceptions in the log but the application still
exits with code 0, which is strange. Another confusing thing is that the
exception says nothing about unsupported formats because it is an internal
exception of the Jackson parser. I would say that in both cases (gz and zip) an
appropriate error with an informative message about unsupported file format
should be thrown.
Also, I checked why it comes to the JSON parser without trying to decompress
the stream and found that ZIP is not on the list of supported compression
codecs (can be found in debug mode on [this
line|https://github.com/apache/flink/blob/af6eff873a53bbdc85a2b1018140754e65758e3e/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java#L153]).
The list includes the following formats:
* xz
* deflate
* gz
* gzip
* zst
* bz2
As we can see, it shouldn't even try to read this ZIP file because it's not
supported. But it does try to read in uncompressed and even decides that the
file should be processed by the Jackson parser. Probably there is one more bug
here.
> Unable to read all records from compressed line-delimited JSON files using
> Table API
> ------------------------------------------------------------------------------------
>
> Key: FLINK-30314
> URL: https://issues.apache.org/jira/browse/FLINK-30314
> Project: Flink
> Issue Type: Improvement
> Components: API / Core, Connectors / FileSystem, Table SQL / API
> Affects Versions: 1.16.0, 1.15.2
> Reporter: Dmitry Yaraev
> Priority: Major
> Attachments: input.json, input.json.gz, input.json.zip
>
>
> I am reading gzipped JSON line-delimited files in the batch mode using
> [FileSystem
> Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/].
> For reading the files a new table is created with the following
> configuration:
> {code:sql}
> CREATE TEMPORARY TABLE `my_database`.`my_table` (
> `my_field1` BIGINT,
> `my_field2` INT,
> `my_field3` VARCHAR(2147483647)
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'path-to-input-dir',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'false',
> 'json.fail-on-missing-field' = 'true'
> ) {code}
> In the input directory I have two files: input-00000.json.gz and
> input-00001.json.gz. As it comes from the filenames, the files are compressed
> with GZIP. Each of the files contains 10 records. The issue is that only 2
> records from each file are read (4 in total). If decompressed versions of the
> same data files are used, all 20 records are read.
> As far as I understand, that problem may be related to the fact that split
> length, which is used when the files are read, is in fact the length of a
> compressed file. So files are closed before all records are read from them
> because read position of the decompressed file stream exceeds split length.
> Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we
> could identify if the file compressed or not. The flag can be set to true in
> {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file
> streams. With such a flag it could be possible to differentiate
> non-splittable compressed files and only rely on the end of the stream.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)