[ 
https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Marco A. Villalobos updated FLINK-29039:
----------------------------------------
    Description: 
RowData produced by LineBytesInputFormat is reused, but 
DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
result will always be the last row value.

 

Given this program:

{{```java}}
{{}}

package mvillalobos.bug;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

import static org.apache.flink.table.api.Expressions.$;

public class IsThisABatchSQLBug {

  public static void main(String[] args) {
     final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
     env.setRuntimeMode(RuntimeExecutionMode.BATCH);
     final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
     tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
           "        `file.path`              STRING NOT NULL METADATA,\n" +
           "        `file.name`              STRING NOT NULL METADATA,\n" +
           "        `file.size`              BIGINT NOT NULL METADATA,\n" +
           "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
METADATA,\n" +
           "        line                    STRING\n" +
           "      ) WITH (\n" +
           "        'connector' = 'filesystem', \n" +
           "        'format' = 'raw'\n" +
           "      );");
     tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
           "      WITH (\n" +
           "        'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n" 
+
           "      ) LIKE historical_raw_source_template;");

     final TableResult output = 
tableEnv.from("historical_raw_source").select($("line")).execute();
     output.print();
  }
}

{{{}{}}}{{{}{}}}{{{}```{}}}

and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' 
directory:

{{```text}}
{{one}}
{{two}}
{{three}}
{{four}}
{{five}}
{{six}}
{{seven}}
{{eight}}
{{nine}}
{{ten}}
{{```}}

{{{{{}The print results are:{}}}}}
{{{{{}```text{}}}}}
{{{+}---{-}{-}{+}-------------------------------+}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{{+}---{-}{-}{+}-------------------------------+}}
{{{{{}10 rows in set{}}}}}
{{{{{}```{}}}}}

  was:
RowData produced by LineBytesInputFormat is reused, but 
DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
result will always be the last row value.

 

Given this program:

{{```java}}
{{package mvillalobos.bug;}}

{{import org.apache.flink.api.common.RuntimeExecutionMode;}}
{{import 
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;}}
{{import org.apache.flink.table.api.TableResult;}}
{{import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;}}
{{import static org.apache.flink.table.api.Expressions.$;}}

{{public class IsThisABatchSQLBug {}}{{  public static void main(String[] args) 
{}}
{{     final StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();}}
{{     env.setRuntimeMode(RuntimeExecutionMode.BATCH);}}
{{     final StreamTableEnvironment tableEnv = 
StreamTableEnvironment.create(env);}}
{{     tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +}}
{{           "        `file.path`              STRING NOT NULL METADATA,\n" +}}
{{           "        `file.name`              STRING NOT NULL METADATA,\n" +}}
{{           "        `file.size`              BIGINT NOT NULL METADATA,\n" +}}
{{           "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
METADATA,\n" +}}
{{           "        line                    STRING\n" +}}
{{           "      ) WITH (\n" +}}
{{           "        'connector' = 'filesystem', \n" +}}
{{           "        'format' = 'raw'\n" +}}
{{           "      );");}}
{{     tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +}}
{{           "      WITH (\n" +}}
{{           "        'path' = 
'/Users/minmay/dev/mvillalobos/historical/data'\n" +}}
{{           "      ) LIKE historical_raw_source_template;");}}{{     final 
TableResult output = 
tableEnv.from("historical_raw_source").select($("line")).execute();}}
{{     output.print();}}

{

{  }

}}
{{}}}
{{```}}

and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data' 
directory:

{{```text}}
{{one}}
{{two}}
{{three}}
{{four}}
{{five}}
{{six}}
{{seven}}
{{eight}}
{{nine}}
{{ten}}
{{```}}

{{{{The print results are:}}}}
{{{{```text}}}}
{{+----+--------------------------------+}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{| +I |                            ten |}}
{{+----+--------------------------------+}}
{{{{10 rows in set}}}}
{{{{```}}}}


> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-29039
>                 URL: https://issues.apache.org/jira/browse/FLINK-29039
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / FileSystem
>    Affects Versions: 1.15.1
>         Environment: This issue was discovered on MacOS Big Sur.
>            Reporter: Marco A. Villalobos
>            Priority: Major
>
> RowData produced by LineBytesInputFormat is reused, but 
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus 
> result will always be the last row value.
>  
> Given this program:
> {{```java}}
> {{}}
> package mvillalobos.bug;
> import org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
> import static org.apache.flink.table.api.Expressions.$;
> public class IsThisABatchSQLBug {
>   public static void main(String[] args) {
>      final StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>      env.setRuntimeMode(RuntimeExecutionMode.BATCH);
>      final StreamTableEnvironment tableEnv = 
> StreamTableEnvironment.create(env);
>      tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
>            "        `file.path`              STRING NOT NULL METADATA,\n" +
>            "        `file.name`              STRING NOT NULL METADATA,\n" +
>            "        `file.size`              BIGINT NOT NULL METADATA,\n" +
>            "        `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL 
> METADATA,\n" +
>            "        line                    STRING\n" +
>            "      ) WITH (\n" +
>            "        'connector' = 'filesystem', \n" +
>            "        'format' = 'raw'\n" +
>            "      );");
>      tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
>            "      WITH (\n" +
>            "        'path' = 
> '/Users/minmay/dev/mvillalobos/historical/data'\n" +
>            "      ) LIKE historical_raw_source_template;");
>      final TableResult output = 
> tableEnv.from("historical_raw_source").select($("line")).execute();
>      output.print();
>   }
> }
> {{{}{}}}{{{}{}}}{{{}```{}}}
> and this sample.csv file in the 
> '/Users/minmay/dev/mvillalobos/historical/data' directory:
> {{```text}}
> {{one}}
> {{two}}
> {{three}}
> {{four}}
> {{five}}
> {{six}}
> {{seven}}
> {{eight}}
> {{nine}}
> {{ten}}
> {{```}}
> {{{{{}The print results are:{}}}}}
> {{{{{}```text{}}}}}
> {{{+}---{-}{-}{+}-------------------------------+}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{| +I |                            ten |}}
> {{{+}---{-}{-}{+}-------------------------------+}}
> {{{{{}10 rows in set{}}}}}
> {{{{{}```{}}}}}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to