[
https://issues.apache.org/jira/browse/FLINK-29855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628226#comment-17628226
]
luoyuxia commented on FLINK-29855:
----------------------------------
How do you know some of the data have been processed twice?
The udf shouldn't process one row for twice, otherwise, it should be a critical
bug.
> UDF randomly processed input data twice
> ----------------------------------------
>
> Key: FLINK-29855
> URL: https://issues.apache.org/jira/browse/FLINK-29855
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.14.4
> Reporter: Xinyi Yan
> Priority: Critical
> Attachments: IntInputUdf.java, SpendReport.java, example.log
>
>
> Local flink cluster env:
> 1 task manager and 1 task slot.
> To reproduce the issue:
> # create a datagen table with a single column int type of id with 1 row per
> second.
> # create a UDF that only mod input data with logging statements.
> # create a print table that prints the results.
> # insert data into the print table with UDF(input id column) execution from
> the datagen table.
> The logging shows that some of the data have been processed twice, which is
> not expected I guess? This will totally change the behavior of the UDF if the
> data has been processed twice. I also attached main and UDF classes, as well
> as the logging file for additional info.
>
> DDL
>
> {code:java}
> public static void main(String[] args) throws Exception {
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().build();
>
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> tEnv.executeSql("CREATE FUNCTION IntInputUdf AS
> 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");
> tEnv.executeSql("CREATE TABLE datagenTable (\n" +
> " id INT\n" +
> ") WITH (\n" +
> " 'connector' = 'datagen',\n" +
> " 'number-of-rows' = '100',\n" +
> " 'rows-per-second' = '1'\n" +
> ")");
> tEnv.executeSql("CREATE TABLE print_table (\n" +
> " id_in_bytes VARBINARY,\n" +
> " id INT\n" +
> ") WITH (\n" +
> " 'connector' = 'print'\n" +
> ")");
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT
> IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE
> ET.`id_in_bytes` IS NOT NULL");
> } {code}
>
> UDF
>
> {code:java}
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
> intputNum) {
> byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
> if (intputNum % 2 == 0) {
> LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ### ###
> duplicated call??? ### DEBUG ### ### ", results, intputNum);
> return results;
> }
> LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
> return null;
> } {code}
> output
>
>
> {code:java}
> 2022-11-02 13:38:56,765 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:56,766 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:57,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:57,763 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:58,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:58,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:59,759 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
> 2022-11-02 13:39:00,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
> 2022-11-02 13:39:01,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:01,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:02,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:02,762 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:03,758 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
> 2022-11-02 13:39:04,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
> 2022-11-02 13:39:05,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
> 2022-11-02 13:39:06,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
> 2022-11-02 13:39:07,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:07,763 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:08,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
> 2022-11-02 13:39:09,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
> 2022-11-02 13:39:10,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
> 2022-11-02 13:39:11,759 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
> 2022-11-02 13:39:12,759 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835.
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)