[
https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Marco A. Villalobos updated FLINK-29039:
----------------------------------------
Description:
RowData produced by LineBytesInputFormat is reused, but
DeserializationSchemaAdapter#Reader only shallow copies produced data, thus
result will always be the last row value.
Given this program:
{{```java}}
{{package mvillalobos.bug;}}
{{import org.apache.flink.api.common.RuntimeExecutionMode;}}
{{import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;}}
{{import org.apache.flink.table.api.TableResult;}}
{{import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;}}
{{import static org.apache.flink.table.api.Expressions.$;}}
{{public class IsThisABatchSQLBug {}}{{ public static void main(String[] args)
{}}
{{ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{ env.setRuntimeMode(RuntimeExecutionMode.BATCH);}}
{{ final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);}}
{{ tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +}}
{{ " `file.path` STRING NOT NULL METADATA,\n" +}}
{{ " `file.name` STRING NOT NULL METADATA,\n" +}}
{{ " `file.size` BIGINT NOT NULL METADATA,\n" +}}
{{ " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL
METADATA,\n" +}}
{{ " line STRING\n" +}}
{{ " ) WITH (\n" +}}
{{ " 'connector' = 'filesystem', \n" +}}
{{ " 'format' = 'raw'\n" +}}
{{ " );");}}
{{ tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +}}
{{ " WITH (\n" +}}
{{ " 'path' =
'/Users/minmay/dev/mvillalobos/historical/data'\n" +}}
{{ " ) LIKE historical_raw_source_template;");}}{{ final
TableResult output =
tableEnv.from("historical_raw_source").select($("line")).execute();}}
{{ output.print();}}
{
{ }
}}
{{}}}
{{```}}
and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data'
directory:
{{```text}}
{{one}}
{{two}}
{{three}}
{{four}}
{{five}}
{{six}}
{{seven}}
{{eight}}
{{nine}}
{{ten}}
{{```}}
{{{{The print results are:}}}}
{{{{```text}}}}
{{+----+--------------------------------+}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{+----+--------------------------------+}}
{{{{10 rows in set}}}}
{{{{```}}}}
was:
RowData produced by LineBytesInputFormat is reused, but
DeserializationSchemaAdapter#Reader only shallow copies produced data, thus
result will always be the last row value.
Given this program:
{{```java}}
{{package mvillalobos.bug;}}
{{import org.apache.flink.api.common.RuntimeExecutionMode;}}
{{import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;}}
{{import org.apache.flink.table.api.TableResult;}}
{{import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;}}
{{import static org.apache.flink.table.api.Expressions.$;}}
{{public class IsThisABatchSQLBug {}}{{ public static void main(String[] args)
{}}
{{ final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{ env.setRuntimeMode(RuntimeExecutionMode.BATCH);}}
{{ final StreamTableEnvironment tableEnv =
StreamTableEnvironment.create(env);}}
{{ tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +}}
{{ " `file.path` STRING NOT NULL METADATA,\n" +}}
{{ " `file.name` STRING NOT NULL METADATA,\n" +}}
{{ " `file.size` BIGINT NOT NULL METADATA,\n" +}}
{{ " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL
METADATA,\n" +}}
{{ " line STRING\n" +}}
{{ " ) WITH (\n" +}}
{{ " 'connector' = 'filesystem', \n" +}}
{{ " 'format' = 'raw'\n" +}}
{{ " );");}}
{{ tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +}}
{{ " WITH (\n" +}}
{{ " 'path' =
'/Users/minmay/dev/mvillalobos/historical/data'\n" +}}
{{ " ) LIKE historical_raw_source_template;");}}{{ final
TableResult output =
tableEnv.from("historical_raw_source").select($("line")).execute();}}
{{ output.print();}}
{\\{ }
}}
{{}}}
{{```}}
and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data'
directory:
{{```text}}
{{one}}
{{two}}
{{three}}
{{four}}
{{five}}
{{six}}
{{seven}}
{{eight}}
{{nine}}
{{ten}}
{{```}}
{{The print results are:}}
{{```text}}
{{{+}--{-}{{-}}{-}{-}{+}------------------------------+}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{{+}--{-}{{-}}{-}{-}{+}------------------------------+}}
{{10 rows in set}}
{{```}}
> RowData produced by LineBytesInputFormat is reused, but
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus
> result will always be the last row value
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-29039
> URL: https://issues.apache.org/jira/browse/FLINK-29039
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.15.1
> Environment: This issue was discovered on MacOS Big Sur.
> Reporter: Marco A. Villalobos
> Priority: Major
>
> RowData produced by LineBytesInputFormat is reused, but
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus
> result will always be the last row value.
>
> Given this program:
> {{```java}}
> {{package mvillalobos.bug;}}
> {{import org.apache.flink.api.common.RuntimeExecutionMode;}}
> {{import
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;}}
> {{import org.apache.flink.table.api.TableResult;}}
> {{import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;}}
> {{import static org.apache.flink.table.api.Expressions.$;}}
> {{public class IsThisABatchSQLBug {}}{{ public static void main(String[]
> args) {}}
> {{ final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();}}
> {{ env.setRuntimeMode(RuntimeExecutionMode.BATCH);}}
> {{ final StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);}}
> {{ tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n"
> +}}
> {{ " `file.path` STRING NOT NULL METADATA,\n"
> +}}
> {{ " `file.name` STRING NOT NULL METADATA,\n"
> +}}
> {{ " `file.size` BIGINT NOT NULL METADATA,\n"
> +}}
> {{ " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL
> METADATA,\n" +}}
> {{ " line STRING\n" +}}
> {{ " ) WITH (\n" +}}
> {{ " 'connector' = 'filesystem', \n" +}}
> {{ " 'format' = 'raw'\n" +}}
> {{ " );");}}
> {{ tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +}}
> {{ " WITH (\n" +}}
> {{ " 'path' =
> '/Users/minmay/dev/mvillalobos/historical/data'\n" +}}
> {{ " ) LIKE historical_raw_source_template;");}}{{ final
> TableResult output =
> tableEnv.from("historical_raw_source").select($("line")).execute();}}
> {{ output.print();}}
> {
> { }
> }}
> {{}}}
> {{```}}
> and this sample.csv file in the
> '/Users/minmay/dev/mvillalobos/historical/data' directory:
> {{```text}}
> {{one}}
> {{two}}
> {{three}}
> {{four}}
> {{five}}
> {{six}}
> {{seven}}
> {{eight}}
> {{nine}}
> {{ten}}
> {{```}}
> {{{{The print results are:}}}}
> {{{{```text}}}}
> {{+----+--------------------------------+}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{| +I | ten |}}
> {{+----+--------------------------------+}}
> {{{{10 rows in set}}}}
> {{{{```}}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)