[
https://issues.apache.org/jira/browse/FLINK-25311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Caizhi Weng updated FLINK-25311:
--------------------------------
Description:
This is reported from the [user mailing
list|https://lists.apache.org/thread/y854gjxyomtypcs8x4f88pttnl9k0j9q].
Run the following test to reproduce this bug.
{code:java}
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.Test;
public class MyTest {
@Test
public void myTest() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironmentImpl.create(settings);
tEnv.executeSql(
"create table T1 ( a INT ) with ( 'connector' =
'filesystem', 'format' = 'json', 'path' = '/tmp/gao.json' )")
.await();
tEnv.executeSql(
"create table T2 ( a INT ) with ( 'connector' =
'filesystem', 'format' = 'json', 'path' = '/tmp/gao.gz' )")
.await();
tEnv.executeSql("select count(*) from T1 UNION ALL select count(*) from
T2").print();
}
}
{code}
Data files used are attached in the attachment.
The result is
{code}
+----------------------+
| EXPR$0 |
+----------------------+
| 100 |
| 24 |
+----------------------+
{code}
which is obviously incorrect.
This is because {{DelimitedInputFormat#fillBuffer}} cannot deal with compressed
files correctly. It limits the number of (uncompressed) bytes read with
{{splitLength}}, while {{splitLength}} is the length of compressed bytes, so
they cannot match.
was:
This is reported in the [user mailing
list|https://lists.apache.org/thread/y854gjxyomtypcs8x4f88pttnl9k0j9q]
Run the following test to reproduce this bug.
{code:java}
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
import org.junit.Test;
public class MyTest {
@Test
public void myTest() throws Exception {
EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
TableEnvironment tEnv = TableEnvironmentImpl.create(settings);
tEnv.executeSql(
"create table T1 ( a INT ) with ( 'connector' =
'filesystem', 'format' = 'json', 'path' = '/tmp/gao.json' )")
.await();
tEnv.executeSql(
"create table T2 ( a INT ) with ( 'connector' =
'filesystem', 'format' = 'json', 'path' = '/tmp/gao.gz' )")
.await();
tEnv.executeSql("select count(*) from T1 UNION ALL select count(*) from
T2").print();
}
}
{code}
Data files used are attached in the attachment.
The result is
{code}
+----------------------+
| EXPR$0 |
+----------------------+
| 100 |
| 24 |
+----------------------+
{code}
which is obviously incorrect.
This is because {{DelimitedInputFormat#fillBuffer}} cannot deal with compressed
files correctly. It limits the number of (uncompressed) bytes read with
{{splitLength}}, while {{splitLength}} is the length of compressed bytes, so
they cannot match.
> DelimitedInputFormat cannot read compressed files correctly
> -----------------------------------------------------------
>
> Key: FLINK-25311
> URL: https://issues.apache.org/jira/browse/FLINK-25311
> Project: Flink
> Issue Type: Bug
> Components: API / Core
> Affects Versions: 1.14.2
> Reporter: Caizhi Weng
> Priority: Major
> Attachments: gao.gz, gao.json
>
>
> This is reported from the [user mailing
> list|https://lists.apache.org/thread/y854gjxyomtypcs8x4f88pttnl9k0j9q].
> Run the following test to reproduce this bug.
> {code:java}
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.internal.TableEnvironmentImpl;
> import org.junit.Test;
> public class MyTest {
> @Test
> public void myTest() throws Exception {
> EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
> TableEnvironment tEnv = TableEnvironmentImpl.create(settings);
> tEnv.executeSql(
> "create table T1 ( a INT ) with ( 'connector' =
> 'filesystem', 'format' = 'json', 'path' = '/tmp/gao.json' )")
> .await();
> tEnv.executeSql(
> "create table T2 ( a INT ) with ( 'connector' =
> 'filesystem', 'format' = 'json', 'path' = '/tmp/gao.gz' )")
> .await();
> tEnv.executeSql("select count(*) from T1 UNION ALL select count(*)
> from T2").print();
> }
> }
> {code}
> Data files used are attached in the attachment.
> The result is
> {code}
> +----------------------+
> | EXPR$0 |
> +----------------------+
> | 100 |
> | 24 |
> +----------------------+
> {code}
> which is obviously incorrect.
> This is because {{DelimitedInputFormat#fillBuffer}} cannot deal with
> compressed files correctly. It limits the number of (uncompressed) bytes read
> with {{splitLength}}, while {{splitLength}} is the length of compressed
> bytes, so they cannot match.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)