[
https://issues.apache.org/jira/browse/FLINK-30314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17648708#comment-17648708
]
Dmitry Yaraev edited comment on FLINK-30314 at 12/16/22 4:32 PM:
-----------------------------------------------------------------
[~echauchot], [~martijnvisser] Thank you for your efforts in verifying the
issue. It would be great to have this implemented (as a bug fix or a new
feature). As promised, I ran several tests using the code from the above
repository and I was able to reproduce the issue with version 1.16.0 as well. I
changed the code a bit, so it could write output to the console:
{code:java|title=TestCompressedJson.java|borderStyle=solid}
package examples.flinktest;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
public class TestCompressedJson {
public static void main(final String[] args) {
final String inputDir = args[0];
// init Table Env
final EnvironmentSettings environmentSettings = EnvironmentSettings
.newInstance()
.inBatchMode()
.build();
final TableEnvironment tableEnv =
TableEnvironment.create(environmentSettings);
// create source
tableEnv.executeSql(String.format(
"CREATE TEMPORARY TABLE SourceTable (my_field1 BIGINT,
my_field2 INT, my_field3 VARCHAR(2147483647)) " +
"WITH ('connector' = 'filesystem', 'path' = '%s',
'format' = 'json')",
inputDir));
// create sink
tableEnv.executeSql(
"CREATE TEMPORARY TABLE SinkTable (my_field1 BIGINT, my_field2
INT, my_field3 VARCHAR(2147483647)) " +
"WITH ('connector' = 'print')");
final Table sourceTable = tableEnv.from("SourceTable");
sourceTable.executeInsert("SinkTable");
}
}{code}
Also, I added a few more lines to the data file so that it contained 30 lines.
Then I ran the code 3 with different inputs (the data files can be found in
attachments):
# input.json (without compression)
# input.json.gz
# input.json.zip
There were no errors for any of the runs, and the outputs were as follows:
{code:java|title=input.json|borderStyle=solid}
4> +I[4646464, 654, test0]
4> +I[4646464, 654, test1]
4> +I[4646464, 654, test2]
4> +I[4646464, 654, test3]
4> +I[4646464, 654, test4]
4> +I[4646464, 654, test5]
4> +I[4646464, 654, test6]
4> +I[4646464, 654, test7]
4> +I[4646464, 654, test8]
4> +I[4646464, 654, test9]
4> +I[4646464, 654, test10]
4> +I[4646464, 654, test11]
4> +I[4646464, 654, test12]
4> +I[4646464, 654, test13]
4> +I[4646464, 654, test14]
4> +I[4646464, 654, test15]
4> +I[4646464, 654, test16]
4> +I[4646464, 654, test17]
4> +I[4646464, 654, test18]
4> +I[4646464, 654, test19]
4> +I[4646464, 654, test20]
4> +I[4646464, 654, test21]
4> +I[4646464, 654, test22]
4> +I[4646464, 654, test23]
4> +I[4646464, 654, test24]
4> +I[4646464, 654, test25]
4> +I[4646464, 654, test26]
4> +I[4646464, 654, test27]
4> +I[4646464, 654, test28]
4> +I[4646464, 654, test29]{code}
{code:java|title=input.json.gz|borderStyle=solid}
4> +I[4646464, 654, test0]
4> +I[4646464, 654, test1]
4> +I[4646464, 654, test2] {code}
There was no output for input.json.zip. It just silently exited with no output
or errors. I would still keep this ticket as a bug.
was (Author: dyaraev):
[~echauchot], [~martijnvisser] Thank you for your efforts in verifying the
issue. It would be great to have this implemented (as a bug fix or a new
feature). As promised, I ran several tests using the code from the above
repository and I was able to reproduce the issue with version 1.16.0 as well. I
changed the code a bit, so it could write output to the console:
{code:java|title=TestCompressedJson.java|borderStyle=solid}
package examples.flinktest;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.TableEnvironment;
public class TestCompressedJson {
public static void main(final String[] args) {
final String inputDir = args[0];
// init Table Env
final EnvironmentSettings environmentSettings =
EnvironmentSettings.newInstance().inBatchMode().build();
final TableEnvironment tableEnv =
TableEnvironment.create(environmentSettings);
// create source
tableEnv.executeSql(
String.format(
"CREATE TEMPORARY TABLE SourceTable (my_field1 BIGINT,
my_field2 INT, my_field3 VARCHAR(2147483647)) WITH ('connector' =
'filesystem', 'path' = '%s', 'format' = 'json')",
inputDir));
// create sink
tableEnv.executeSql("CREATE TEMPORARY TABLE SinkTable (my_field1
BIGINT, my_field2 INT, my_field3 VARCHAR(2147483647)) WITH ('connector' =
'print')");
final Table sourceTable = tableEnv.from("SourceTable");
sourceTable.executeInsert("SinkTable");
}
}{code}
Also, I added a few more lines to the data file so that it contained 30 lines.
Then I ran the code 3 with different inputs (the data files can be found in
attachments):
# input.json (without compression)
# input.json.gz
# input.json.zip
There were no errors for any of the runs, and the outputs were as follows:
{code:java|title=input.json|borderStyle=solid}
4> +I[4646464, 654, test0]
4> +I[4646464, 654, test1]
4> +I[4646464, 654, test2]
4> +I[4646464, 654, test3]
4> +I[4646464, 654, test4]
4> +I[4646464, 654, test5]
4> +I[4646464, 654, test6]
4> +I[4646464, 654, test7]
4> +I[4646464, 654, test8]
4> +I[4646464, 654, test9]
4> +I[4646464, 654, test10]
4> +I[4646464, 654, test11]
4> +I[4646464, 654, test12]
4> +I[4646464, 654, test13]
4> +I[4646464, 654, test14]
4> +I[4646464, 654, test15]
4> +I[4646464, 654, test16]
4> +I[4646464, 654, test17]
4> +I[4646464, 654, test18]
4> +I[4646464, 654, test19]
4> +I[4646464, 654, test20]
4> +I[4646464, 654, test21]
4> +I[4646464, 654, test22]
4> +I[4646464, 654, test23]
4> +I[4646464, 654, test24]
4> +I[4646464, 654, test25]
4> +I[4646464, 654, test26]
4> +I[4646464, 654, test27]
4> +I[4646464, 654, test28]
4> +I[4646464, 654, test29]{code}
{code:java|title=input.json.gz|borderStyle=solid}
4> +I[4646464, 654, test0]
4> +I[4646464, 654, test1]
4> +I[4646464, 654, test2] {code}
There was no output for input.json.zip. It just silently exited with no output
or errors. I would still keep this ticket as a bug.
> Unable to read all records from compressed line-delimited JSON files using
> Table API
> ------------------------------------------------------------------------------------
>
> Key: FLINK-30314
> URL: https://issues.apache.org/jira/browse/FLINK-30314
> Project: Flink
> Issue Type: Improvement
> Components: API / Core
> Affects Versions: 1.16.0, 1.15.2
> Reporter: Dmitry Yaraev
> Priority: Major
> Attachments: input.json, input.json.gz, input.json.zip
>
>
> I am reading gzipped JSON line-delimited files in the batch mode using
> [FileSystem
> Connector|https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/filesystem/].
> For reading the files a new table is created with the following
> configuration:
> {code:sql}
> CREATE TEMPORARY TABLE `my_database`.`my_table` (
> `my_field1` BIGINT,
> `my_field2` INT,
> `my_field3` VARCHAR(2147483647)
> ) WITH (
> 'connector' = 'filesystem',
> 'path' = 'path-to-input-dir',
> 'format' = 'json',
> 'json.ignore-parse-errors' = 'false',
> 'json.fail-on-missing-field' = 'true'
> ) {code}
> In the input directory I have two files: input-00000.json.gz and
> input-00001.json.gz. As it comes from the filenames, the files are compressed
> with GZIP. Each of the files contains 10 records. The issue is that only 2
> records from each file are read (4 in total). If decompressed versions of the
> same data files are used, all 20 records are read.
> As far as I understand, that problem may be related to the fact that split
> length, which is used when the files are read, is in fact the length of a
> compressed file. So files are closed before all records are read from them
> because read position of the decompressed file stream exceeds split length.
> Probably, it makes sense to add a flag to {{{}FSDataInputStream{}}}, so we
> could identify if the file compressed or not. The flag can be set to true in
> {{InputStreamFSInputWrapper}} because it is used for wrapping compressed file
> streams. With such a flag it could be possible to differentiate
> non-splittable compressed files and only rely on the end of the stream.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)