[ 
https://issues.apache.org/jira/browse/FLINK-38580?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18033455#comment-18033455
 ] 

lajith commented on FLINK-38580:
--------------------------------

Also required documentation improvement here 
[https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/data_stream_api/#examples-for-todatastream]
 

> Inconsistent data between Table API and DataStream API when using datagen 
> connector
> -----------------------------------------------------------------------------------
>
>                 Key: FLINK-38580
>                 URL: https://issues.apache.org/jira/browse/FLINK-38580
>             Project: Flink
>          Issue Type: Bug
>          Components: API / DataStream
>    Affects Versions: 1.20.2
>            Reporter: lajith
>            Priority: Major
>
> When using the datagen connector with Table API and then converting to 
> DataStream API using toDataStream(), the data values change unexpectedly. 
> This inconsistency only occurs with the datagen connector and not with other 
> connectors like Kafka.
> Steps to Reproduce:
> 1. Create a Table using SQL with the datagen connector
> 2. Execute the Table and observe the generated data
> 3. Convert the same Table to DataStream using toDataStream()
> 4. Execute the DataStream and observe the data
> 5. Notice that the data values are different between steps 2 and 4
> Sample Code :
> {noformat}
>         // Step 1: Set up environments
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
>         // Step 2: Create Datagen source table
>         tableEnv.executeSql(    "CREATE TABLE datagen_source (" +    "  id 
> INT," +    "  name STRING" +    ") WITH " +
>                 "(" +    "  'connector' = 'datagen'," +    "  
> 'rows-per-second' = '5'," +    "  'fields.id.kind' = 'sequence'," +
>                 "  'fields.id.start' = '1'," +    "  'fields.id.end' = '10'," 
> +    "  'fields.name.length' = '10'" +    ")");
>         // Step 3: Apply SQL
>         Table table = tableEnv.sqlQuery("SELECT * FROM datagen_source");
>         TableResult result = table.execute();
>         // Step 4: print table.
>         result.print();
>         // Step 5: Convert to DataStream
>         DataStream<Row> dataStream = tableEnv.toDataStream(table);
>         // Step 6: Print DataStream (after conversion)
>         dataStream.print("After Conversion");
>         // Step 7: Execute the job
>         env.execute("Datagen SQL to DataStream Example");{noformat}
> Observed Behavior:
> {noformat}
> Table results
> +----+-------------+--------------------------------+
> | op |          id |                           name |
> +----+-------------+--------------------------------+
> | +I |           1 |                     0deb105a16 |
> | +I |           2 |                     badce9aae9 |
> | +I |           3 |                     e47ab6e948 |
> | +I |           4 |                     ce6ff2d766 |
> | +I |           5 |                     375c6d4d80 |
> | +I |           6 |                     32db6d87b0 |
> | +I |           7 |                     9268b4b5cd |
> | +I |           8 |                     026b54358c |
> | +I |           9 |                     446cb47153 |
> | +I |          10 |                     bc7950df1a |
> +----+-------------+--------------------------------+
> After conversion to DataStream. 
> 10 rows in set
> After Conversion:1> +I[1, 63faf42e44]
> After Conversion:2> +I[2, 6d5e0b4e5c]
> After Conversion:3> +I[3, 53ecd6883a]
> After Conversion:4> +I[4, 55eb56b2bf]
> After Conversion:6> +I[6, 6d5feb43b2]
> After Conversion:5> +I[5, 70daedf4f5]
> After Conversion:7> +I[7, c1ee369d41]
> After Conversion:8> +I[8, 14cbcf03d8]
> After Conversion:9> +I[9, 707854f4ed]
> After Conversion:10> +I[10, 86cf8d8c6b] {noformat}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to