[ 
https://issues.apache.org/jira/browse/FLINK-25311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Caizhi Weng updated FLINK-25311:
--------------------------------
    Description: 
This is reported from the [user mailing 
list|https://lists.apache.org/thread/y854gjxyomtypcs8x4f88pttnl9k0j9q].

Run the following test to reproduce this bug.
{code:java}
import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;

import org.junit.Test;

public class MyTest {

    @Test
    public void myTest() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
        TableEnvironment tEnv = TableEnvironmentImpl.create(settings);
        tEnv.executeSql(
                        "create table T1 ( a INT ) with ( 'connector' = 
'filesystem', 'format' = 'json', 'path' = '/tmp/gao.json' )")
                .await();
        tEnv.executeSql(
                        "create table T2 ( a INT ) with ( 'connector' = 
'filesystem', 'format' = 'json', 'path' = '/tmp/gao.gz' )")
                .await();
        tEnv.executeSql("select count(*) from T1 UNION ALL select count(*) from 
T2").print();
    }
}
{code}

Data files used are attached in the attachment.

The result is
{code}
+----------------------+
|               EXPR$0 |
+----------------------+
|                  100 |
|                   24 |
+----------------------+
{code}

which is obviously incorrect.

This is because {{DelimitedInputFormat#fillBuffer}} cannot deal with compressed 
files correctly. It limits the number of (uncompressed) bytes read with 
{{splitLength}}, while {{splitLength}} is the length of compressed bytes, so 
they cannot match.

  was:
This is reported in the [user mailing 
list|https://lists.apache.org/thread/y854gjxyomtypcs8x4f88pttnl9k0j9q]

Run the following test to reproduce this bug.
{code:java}
import org.apache.flink.table.api.EnvironmentSettings;

import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;

import org.junit.Test;

public class MyTest {

    @Test
    public void myTest() throws Exception {
        EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
        TableEnvironment tEnv = TableEnvironmentImpl.create(settings);
        tEnv.executeSql(
                        "create table T1 ( a INT ) with ( 'connector' = 
'filesystem', 'format' = 'json', 'path' = '/tmp/gao.json' )")
                .await();
        tEnv.executeSql(
                        "create table T2 ( a INT ) with ( 'connector' = 
'filesystem', 'format' = 'json', 'path' = '/tmp/gao.gz' )")
                .await();
        tEnv.executeSql("select count(*) from T1 UNION ALL select count(*) from 
T2").print();
    }
}
{code}

Data files used are attached in the attachment.

The result is
{code}
+----------------------+
|               EXPR$0 |
+----------------------+
|                  100 |
|                   24 |
+----------------------+
{code}

which is obviously incorrect.

This is because {{DelimitedInputFormat#fillBuffer}} cannot deal with compressed 
files correctly. It limits the number of (uncompressed) bytes read with 
{{splitLength}}, while {{splitLength}} is the length of compressed bytes, so 
they cannot match.


> DelimitedInputFormat cannot read compressed files correctly
> -----------------------------------------------------------
>
>                 Key: FLINK-25311
>                 URL: https://issues.apache.org/jira/browse/FLINK-25311
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Core
>    Affects Versions: 1.14.2
>            Reporter: Caizhi Weng
>            Priority: Major
>         Attachments: gao.gz, gao.json
>
>
> This is reported from the [user mailing 
> list|https://lists.apache.org/thread/y854gjxyomtypcs8x4f88pttnl9k0j9q].
> Run the following test to reproduce this bug.
> {code:java}
> import org.apache.flink.table.api.EnvironmentSettings;
> import org.apache.flink.table.api.TableEnvironment;
> import org.apache.flink.table.api.internal.TableEnvironmentImpl;
> import org.junit.Test;
> public class MyTest {
>     @Test
>     public void myTest() throws Exception {
>         EnvironmentSettings settings = EnvironmentSettings.inBatchMode();
>         TableEnvironment tEnv = TableEnvironmentImpl.create(settings);
>         tEnv.executeSql(
>                         "create table T1 ( a INT ) with ( 'connector' = 
> 'filesystem', 'format' = 'json', 'path' = '/tmp/gao.json' )")
>                 .await();
>         tEnv.executeSql(
>                         "create table T2 ( a INT ) with ( 'connector' = 
> 'filesystem', 'format' = 'json', 'path' = '/tmp/gao.gz' )")
>                 .await();
>         tEnv.executeSql("select count(*) from T1 UNION ALL select count(*) 
> from T2").print();
>     }
> }
> {code}
> Data files used are attached in the attachment.
> The result is
> {code}
> +----------------------+
> |               EXPR$0 |
> +----------------------+
> |                  100 |
> |                   24 |
> +----------------------+
> {code}
> which is obviously incorrect.
> This is because {{DelimitedInputFormat#fillBuffer}} cannot deal with 
> compressed files correctly. It limits the number of (uncompressed) bytes read 
> with {{splitLength}}, while {{splitLength}} is the length of compressed 
> bytes, so they cannot match.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to