[ 
https://issues.apache.org/jira/browse/FLINK-29855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628327#comment-17628327
 ] 

SenBin Lin edited comment on FLINK-29855 at 11/3/22 1:33 PM:
-------------------------------------------------------------

Hi, Just for information. I modify the program provided by [~yanxinyi] and 
change the generator kind of field `id` from {color:#172b4d}*random*{color} 
(default) to *sequence.*  As the following,  datagen table DDL,

        tEnv.executeSql(
                String.format(
                        "create table %s (\n"
                                + "id INT \n"
                                + ") with (\n"
                                + "'connector' = 'datagen',\n"
                                + "'number-of-rows' = '10',\n"
                                + "'rows-per-second' = '1',\n"
                                + "'fields.id.kind' = 'sequence',\n"
                                + "'fields.id.start' = '0',\n"
                                + "'fields.id.end' = '100'\n"
                                + ")",
                        "datagenTable"));

 

The other code basically remains the same. I first try to examinate the records 
in the datagen table.

        tEnv.executeSql("select `id` from datagenTable")
                        .print();

The output is as expected, as shown below.

{+}---{-}{-}{+}------------+
|op|         id|

{+}---{-}{-}{+}------------+
|+I|          0|
|+I|          1|
|+I|          2|
|+I|          3|
|+I|          7|
|+I|          6|
|+I|          5|
|+I|          4| |
|+I|          9|
|+I|          8| |

 

The  key output log of the main program is shown below.

 

20:41:24,205 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [49] and num 1.
20:41:24,207 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [48] and num 0. +++ DEBUG +++ duplicated ?
20:41:24,207 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [48] and num 0. +++ DEBUG +++ duplicated ?

1> +I[[48], 0]
20:41:24,233 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [50] and num 2. +++ DEBUG +++ duplicated ?
20:41:24,233 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [50] and num 2. +++ DEBUG +++ duplicated ?
3> +I[[50], 2]
20:41:24,233 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [51] and num 3.
20:41:25,204 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [52] and num 4. +++ DEBUG +++ duplicated ?
20:41:25,204 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [52] and num 4. +++ DEBUG +++ duplicated ?
1> +I[[52], 4]
20:41:25,204 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [53] and num 5.
20:41:25,232 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [55] and num 7.
20:41:25,233 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [54] and num 6. +++ DEBUG +++ duplicated ?
20:41:25,233 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [54] and num 6. +++ DEBUG +++ duplicated ?
3> +I[[54], 6]
20:41:26,204 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [56] and num 8. +++ DEBUG +++ duplicated ?
20:41:26,204 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [56] and num 8. +++ DEBUG +++ duplicated ?
20:41:26,204 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [57] and num 9.
1> +I[[56], 8]

 

The results in print table are as expected. But the logging shows that  rows 
which met the condition(that is the not *null* result value) might processed 
twice.

Since I'm not familar with the flink udf internals, I can't find the cause of 
this issue.


was (Author: rovo98):
Hi, Just for information. I modify the program provided by [~yanxinyi] and 
change the generator kind of field `id` from {color:#172b4d}*random*{color} 
(default) to *sequence.*  As the following,  datagen table DDL,

        tEnv.executeSql(
                String.format(
                        "create table %s (\n"
                                + "id INT \n"
                                + ") with (\n"
                                + "'connector' = 'datagen',\n"
                                + "'number-of-rows' = '10',\n"
                                + "'rows-per-second' = '1',\n"
                                + "'fields.id.kind' = 'sequence',\n"
                                + "'fields.id.start' = '0',\n"
                                + "'fields.id.end' = '100'\n"
                                + ")",
                        "datagenTable"));

 

The other code basically remains the same. I first try to examinate the records 
in the datagen table.

        tEnv.executeSql("select `id` from datagenTable")
                        .print();

The output is as expected, as shown below.

+----+-------------+
| op |          id |
+----+-------------+
| +I |           0 |
| +I |           1 |
| +I |           2 |
| +I |           3 |
| +I |           7 |
| +I |           6 |
| +I |           5 |
| +I |           4 | 

| +I |           9 |
| +I |           8 | 

 

The  key output log of the main program is shown below.

 

20:41:24,205 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [49] and num 1.
20:41:24,207 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [48] and num 0. +++ DEBUG +++ duplicated ?
20:41:24,207 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [48] and num 0. +++ DEBUG +++ duplicated ?

1> +I[[48], 0]
20:41:24,233 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [50] and num 2. +++ DEBUG +++ duplicated ?
20:41:24,233 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [50] and num 2. +++ DEBUG +++ duplicated ?
3> +I[[50], 2]
20:41:24,233 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [51] and num 3.
20:41:25,204 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [52] and num 4. +++ DEBUG +++ duplicated ?
20:41:25,204 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [52] and num 4. +++ DEBUG +++ duplicated ?
1> +I[[52], 4]
20:41:25,204 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [53] and num 5.
20:41:25,232 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [55] and num 7.
20:41:25,233 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [54] and num 6. +++ DEBUG +++ duplicated ?
20:41:25,233 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [54] and num 6. +++ DEBUG +++ duplicated ?
3> +I[[54], 6]
20:41:26,204 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [56] and num 8. +++ DEBUG +++ duplicated ?
20:41:26,204 INFO  osp.udf.IntInputUdf                                          
[] - ### ### input bytes [56] and num 8. +++ DEBUG +++ duplicated ?
20:41:26,204 INFO  osp.udf.IntInputUdf                                          
[] - *** *** input bytes [57] and num 9.
1> +I[[56], 8]

 

The results in print table are as expected. But the logging shows that  rows 
which met the condition(that is the not *null* result value) might processed 
twice.

Since I'm not familar the flink udf internals, I can't find the cause of this 
issue.

> UDF randomly processed input data twice 
> ----------------------------------------
>
>                 Key: FLINK-29855
>                 URL: https://issues.apache.org/jira/browse/FLINK-29855
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.4
>            Reporter: Xinyi Yan
>            Priority: Critical
>         Attachments: IntInputUdf.java, SpendReport.java, example.log
>
>
> Local flink cluster env: 
> 1 task manager and 1 task slot.
> To reproduce the issue:
>  # create a datagen table with a single column int type of id with 1 row per 
> second.
>  # create a UDF that only mod input data with logging statements.
>  # create a print table that prints the results.
>  # insert data into the print table with UDF(input id column) execution from 
> the datagen table.
> The logging shows that some of the data have been processed twice, which is 
> not expected I guess? This will totally change the behavior of the UDF if the 
> data has been processed twice. I also attached main and UDF classes, as well 
> as the logging file for additional info.
>  
> DDL
>  
> {code:java}
> public static void main(String[] args) throws Exception {
>         EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().build();
>         
>         TableEnvironment tEnv = TableEnvironment.create(settings);
>         
>         tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 
> 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");        
> tEnv.executeSql("CREATE TABLE datagenTable (\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'datagen',\n" +
>                 "    'number-of-rows' = '100',\n" +
>                 "    'rows-per-second' = '1'\n" +
>                 ")");        
> tEnv.executeSql("CREATE TABLE print_table (\n" +
>                 "    id_in_bytes  VARBINARY,\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'print'\n" +
>                 ")");        
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT 
> IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
> ET.`id_in_bytes` IS NOT NULL");
>     }  {code}
>  
> UDF
>  
> {code:java}
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer 
> intputNum) {
>     byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
>     if (intputNum % 2 == 0) {
>       LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
> duplicated call??? ### DEBUG  ### ### ", results, intputNum);
>       return results;
>     }
>     LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
>     return null;
>   } {code}
> output
>  
>  
> {code:java}
> 2022-11-02 13:38:56,765 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:56,766 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:57,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:57,763 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:58,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:58,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:59,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
> 2022-11-02 13:39:00,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:02,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:02,762 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:03,758 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
> 2022-11-02 13:39:04,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
> 2022-11-02 13:39:05,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
> 2022-11-02 13:39:06,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
> 2022-11-02 13:39:07,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:07,763 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:08,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
> 2022-11-02 13:39:09,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
> 2022-11-02 13:39:10,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
> 2022-11-02 13:39:11,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
> 2022-11-02 13:39:12,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. 
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to