[
https://issues.apache.org/jira/browse/FLINK-29039?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Marco A. Villalobos updated FLINK-29039:
----------------------------------------
Description:
RowData produced by LineBytesInputFormat is reused, but
DeserializationSchemaAdapter#Reader only shallow copies produced data, thus
result will always be the last row value.
Given this program:
{code:java}
package mvillalobos.bug;import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import
static org.apache.flink.table.api.Expressions.$;public class IsThisABatchSQLBug
{ public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
" `file.path` STRING NOT NULL METADATA,\n" +
" `file.name` STRING NOT NULL METADATA,\n" +
" `file.size` BIGINT NOT NULL METADATA,\n" +
" `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL
METADATA,\n" +
" line STRING\n" +
" ) WITH (\n" +
" 'connector' = 'filesystem', \n" +
" 'format' = 'raw'\n" +
" );");
tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
" WITH (\n" +
" 'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n"
+
" ) LIKE historical_raw_source_template;"); final
TableResult output =
tableEnv.from("historical_raw_source").select($("line")).execute();
output.print();
}
} {code}
and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data'
directory:
{code:java}
one
two
three
four
five
six
seven
eight
nine
ten {code}
{{The print results are:}}
{{}}
{code:java}
+----+--------------------------------+
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
| +I | ten |
+----+--------------------------------+
10 rows in set {code}
was:
RowData produced by LineBytesInputFormat is reused, but
DeserializationSchemaAdapter#Reader only shallow copies produced data, thus
result will always be the last row value.
Given this program:
{{```java}}
{{}}
package mvillalobos.bug;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import static org.apache.flink.table.api.Expressions.$;
public class IsThisABatchSQLBug {
public static void main(String[] args) {
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
" `file.path` STRING NOT NULL METADATA,\n" +
" `file.name` STRING NOT NULL METADATA,\n" +
" `file.size` BIGINT NOT NULL METADATA,\n" +
" `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL
METADATA,\n" +
" line STRING\n" +
" ) WITH (\n" +
" 'connector' = 'filesystem', \n" +
" 'format' = 'raw'\n" +
" );");
tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
" WITH (\n" +
" 'path' = '/Users/minmay/dev/mvillalobos/historical/data'\n"
+
" ) LIKE historical_raw_source_template;");
final TableResult output =
tableEnv.from("historical_raw_source").select($("line")).execute();
output.print();
}
}
{{{}{}}}{{{}{}}}{{{}```{}}}
and this sample.csv file in the '/Users/minmay/dev/mvillalobos/historical/data'
directory:
{{```text}}
{{one}}
{{two}}
{{three}}
{{four}}
{{five}}
{{six}}
{{seven}}
{{eight}}
{{nine}}
{{ten}}
{{```}}
{{{{{}The print results are:{}}}}}
{{{{{}```text{}}}}}
{{{+}---{-}{-}{+}-------------------------------+}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{| +I | ten |}}
{{{+}---{-}{-}{+}-------------------------------+}}
{{{{{}10 rows in set{}}}}}
{{{{{}```{}}}}}
> RowData produced by LineBytesInputFormat is reused, but
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus
> result will always be the last row value
> ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: FLINK-29039
> URL: https://issues.apache.org/jira/browse/FLINK-29039
> Project: Flink
> Issue Type: Bug
> Components: Connectors / FileSystem
> Affects Versions: 1.15.1
> Environment: This issue was discovered on MacOS Big Sur.
> Reporter: Marco A. Villalobos
> Priority: Major
>
> RowData produced by LineBytesInputFormat is reused, but
> DeserializationSchemaAdapter#Reader only shallow copies produced data, thus
> result will always be the last row value.
>
> Given this program:
> {code:java}
> package mvillalobos.bug;import
> org.apache.flink.api.common.RuntimeExecutionMode;
> import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
> import org.apache.flink.table.api.TableResult;
> import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;import
> static org.apache.flink.table.api.Expressions.$;public class
> IsThisABatchSQLBug { public static void main(String[] args) {
> final StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> env.setRuntimeMode(RuntimeExecutionMode.BATCH);
> final StreamTableEnvironment tableEnv =
> StreamTableEnvironment.create(env);
> tableEnv.executeSql("CREATE TABLE historical_raw_source_template(\n" +
> " `file.path` STRING NOT NULL METADATA,\n" +
> " `file.name` STRING NOT NULL METADATA,\n" +
> " `file.size` BIGINT NOT NULL METADATA,\n" +
> " `file.modification-time` TIMESTAMP_LTZ(3) NOT NULL
> METADATA,\n" +
> " line STRING\n" +
> " ) WITH (\n" +
> " 'connector' = 'filesystem', \n" +
> " 'format' = 'raw'\n" +
> " );");
> tableEnv.executeSql("CREATE TABLE historical_raw_source\n" +
> " WITH (\n" +
> " 'path' =
> '/Users/minmay/dev/mvillalobos/historical/data'\n" +
> " ) LIKE historical_raw_source_template;"); final
> TableResult output =
> tableEnv.from("historical_raw_source").select($("line")).execute();
> output.print();
> }
> } {code}
> and this sample.csv file in the
> '/Users/minmay/dev/mvillalobos/historical/data' directory:
> {code:java}
> one
> two
> three
> four
> five
> six
> seven
> eight
> nine
> ten {code}
> {{The print results are:}}
> {{}}
> {code:java}
> +----+--------------------------------+
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> | +I | ten |
> +----+--------------------------------+
> 10 rows in set {code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)