[ 
https://issues.apache.org/jira/browse/FLINK-29855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628515#comment-17628515
 ] 

Xinyi Yan commented on FLINK-29855:
-----------------------------------

I added a new UDF that randomly returns either null or bytes, and it resulted 
in an incorrect result. First of all, it randomly executed input data twice. 
Secondly, the where clause did not handle `{*}IS NOT NULL{*}`. This is 
definitely a big problem.

Datagen: sequence from 1 to 5.

 

Query
{code:java}
INSERT INTO print_table 
     SELECT * FROM ( 
           SELECT RandomUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable 
     ) 
AS ET WHERE ET.`id_in_bytes` IS NOT NULL" {code}
Result: 
{code:java}
+I[null, 1] 
+I[[50], 2] 
+I[null, 4] {code}
UDF
{code:java}
public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer 
intputNum) {
    byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
    int randomNumber = ((int) (Math.random() * (10 - 1))) + 1;
    LOG.info("[*][*][*] input num is {} and random number is {}. [*][*][*]", 
intputNum, randomNumber);
    if (randomNumber % 2 == 0) {
      LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
duplicated call??? ### DEBUG  ### ### ", results, intputNum);
      return results;
    }
    LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
    return null;
  } {code}
Log:
{code:java}
2022-11-03 12:04:54,018 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - [*][*][*] 
input num is 1 and random number is 4. [*][*][*]
2022-11-03 12:04:54,018 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [49] and num 1.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### 
### 
2022-11-03 12:04:54,019 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - [*][*][*] 
input num is 1 and random number is 7. [*][*][*]
2022-11-03 12:04:54,019 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [49] and num 1.
2022-11-03 12:04:55,018 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - [*][*][*] 
input num is 2 and random number is 6. [*][*][*]
2022-11-03 12:04:55,020 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [50] and num 2.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### 
### 
2022-11-03 12:04:55,020 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - [*][*][*] 
input num is 2 and random number is 4. [*][*][*]
2022-11-03 12:04:55,021 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [50] and num 2.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### 
### 
2022-11-03 12:04:56,014 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - [*][*][*] 
input num is 3 and random number is 9. [*][*][*]
2022-11-03 12:04:56,015 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [51] and num 3.
2022-11-03 12:04:57,014 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - [*][*][*] 
input num is 4 and random number is 2. [*][*][*]
2022-11-03 12:04:57,015 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### input 
bytes [52] and num 4.   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### 
### 
2022-11-03 12:04:57,015 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - [*][*][*] 
input num is 4 and random number is 7. [*][*][*]
2022-11-03 12:04:57,015 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [52] and num 4.
2022-11-03 12:04:58,017 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - [*][*][*] 
input num is 5 and random number is 7. [*][*][*]
2022-11-03 12:04:58,018 INFO  
org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** input 
bytes [53] and num 5. {code}
 

> UDF randomly processed input data twice 
> ----------------------------------------
>
>                 Key: FLINK-29855
>                 URL: https://issues.apache.org/jira/browse/FLINK-29855
>             Project: Flink
>          Issue Type: Bug
>    Affects Versions: 1.14.4
>            Reporter: Xinyi Yan
>            Priority: Critical
>         Attachments: IntInputUdf.java, SpendReport.java, example.log
>
>
> Local flink cluster env: 
> 1 task manager and 1 task slot.
> To reproduce the issue:
>  # create a datagen table with a single column int type of id with 1 row per 
> second.
>  # create a UDF that only mod input data with logging statements.
>  # create a print table that prints the results.
>  # insert data into the print table with UDF(input id column) execution from 
> the datagen table.
> The logging shows that some of the data have been processed twice, which is 
> not expected I guess? This will totally change the behavior of the UDF if the 
> data has been processed twice. I also attached main and UDF classes, as well 
> as the logging file for additional info.
>  
> DDL
>  
> {code:java}
> public static void main(String[] args) throws Exception {
>         EnvironmentSettings settings = 
> EnvironmentSettings.newInstance().build();
>         
>         TableEnvironment tEnv = TableEnvironment.create(settings);
>         
>         tEnv.executeSql("CREATE FUNCTION IntInputUdf AS 
> 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");        
> tEnv.executeSql("CREATE TABLE datagenTable (\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'datagen',\n" +
>                 "    'number-of-rows' = '100',\n" +
>                 "    'rows-per-second' = '1'\n" +
>                 ")");        
> tEnv.executeSql("CREATE TABLE print_table (\n" +
>                 "    id_in_bytes  VARBINARY,\n" +
>                 "    id  INT\n" +
>                 ") WITH (\n" +
>                 "    'connector' = 'print'\n" +
>                 ")");        
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT 
> IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE 
> ET.`id_in_bytes` IS NOT NULL");
>     }  {code}
>  
> UDF
>  
> {code:java}
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer 
> intputNum) {
>     byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
>     if (intputNum % 2 == 0) {
>       LOG.info("### ### input bytes {} and num {}.   ### ### DEBUG ### ### 
> duplicated call??? ### DEBUG  ### ### ", results, intputNum);
>       return results;
>     }
>     LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
>     return null;
>   } {code}
> output
>  
>  
> {code:java}
> 2022-11-02 13:38:56,765 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:56,766 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:57,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:57,763 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:58,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:58,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.   
> ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:38:59,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
> 2022-11-02 13:39:00,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:01,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:02,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:02,762 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:03,758 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
> 2022-11-02 13:39:04,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
> 2022-11-02 13:39:05,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
> 2022-11-02 13:39:06,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
> 2022-11-02 13:39:07,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:07,763 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - ### ### 
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326. 
>   ### ### DEBUG ### ### duplicated call??? ### DEBUG  ### ### 
> 2022-11-02 13:39:08,760 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
> 2022-11-02 13:39:09,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
> 2022-11-02 13:39:10,761 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
> 2022-11-02 13:39:11,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
> 2022-11-02 13:39:12,759 INFO  
> org.apache.flink.playgrounds.spendreport.IntInputUdf         [] - *** *** 
> input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835. 
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to