[
https://issues.apache.org/jira/browse/FLINK-38580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033455#comment-18033455
]
lajith commented on FLINK-38580:
--------------------------------
Also required documentation improvement here
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-todatastream]
> Inconsistent data between Table API and DataStream API when using datagen
> connector
> -----------------------------------------------------------------------------------
>
> Key: FLINK-38580
> URL: https://issues.apache.org/jira/browse/FLINK-38580
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.20.2
> Reporter: lajith
> Priority: Major
>
> When using the datagen connector with Table API and then converting to
> DataStream API using toDataStream(), the data values change unexpectedly.
> This inconsistency only occurs with the datagen connector and not with other
> connectors like Kafka.
> Steps to Reproduce:
> 1. Create a Table using SQL with the datagen connector
> 2. Execute the Table and observe the generated data
> 3. Convert the same Table to DataStream using toDataStream()
> 4. Execute the DataStream and observe the data
> 5. Notice that the data values are different between steps 2 and 4
> Sample Code :
> {noformat}
> // Step 1: Set up environments
> StreamExecutionEnvironment env =
> StreamExecutionEnvironment.getExecutionEnvironment();
> StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
> // Step 2: Create Datagen source table
> tableEnv.executeSql( "CREATE TABLE datagen_source (" + " id
> INT," + " name STRING" + ") WITH " +
> "(" + " 'connector' = 'datagen'," + "
> 'rows-per-second' = '5'," + " 'fields.id.kind' = 'sequence'," +
> " 'fields.id.start' = '1'," + " 'fields.id.end' = '10',"
> + " 'fields.name.length' = '10'" + ")");
> // Step 3: Apply SQL
> Table table = tableEnv.sqlQuery("SELECT * FROM datagen_source");
> TableResult result = table.execute();
> // Step 4: print table.
> result.print();
> // Step 5: Convert to DataStream
> DataStream<Row> dataStream = tableEnv.toDataStream(table);
> // Step 6: Print DataStream (after conversion)
> dataStream.print("After Conversion");
> // Step 7: Execute the job
> env.execute("Datagen SQL to DataStream Example");{noformat}
> Observed Behavior:
> {noformat}
> Table results
> +----+-------------+--------------------------------+
> | op | id | name |
> +----+-------------+--------------------------------+
> | +I | 1 | 0deb105a16 |
> | +I | 2 | badce9aae9 |
> | +I | 3 | e47ab6e948 |
> | +I | 4 | ce6ff2d766 |
> | +I | 5 | 375c6d4d80 |
> | +I | 6 | 32db6d87b0 |
> | +I | 7 | 9268b4b5cd |
> | +I | 8 | 026b54358c |
> | +I | 9 | 446cb47153 |
> | +I | 10 | bc7950df1a |
> +----+-------------+--------------------------------+
> After conversion to DataStream.
> 10 rows in set
> After Conversion:1> +I[1, 63faf42e44]
> After Conversion:2> +I[2, 6d5e0b4e5c]
> After Conversion:3> +I[3, 53ecd6883a]
> After Conversion:4> +I[4, 55eb56b2bf]
> After Conversion:6> +I[6, 6d5feb43b2]
> After Conversion:5> +I[5, 70daedf4f5]
> After Conversion:7> +I[7, c1ee369d41]
> After Conversion:8> +I[8, 14cbcf03d8]
> After Conversion:9> +I[9, 707854f4ed]
> After Conversion:10> +I[10, 86cf8d8c6b] {noformat}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)