[
https://issues.apache.org/jira/browse/FLINK-29855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17628327#comment-17628327
]
SenBin Lin edited comment on FLINK-29855 at 11/3/22 1:33 PM:
-------------------------------------------------------------
Hi, Just for information. I modify the program provided by [~yanxinyi] and
change the generator kind of field `id` from {color:#172b4d}*random*{color}
(default) to *sequence.* As the following, datagen table DDL,
tEnv.executeSql(
String.format(
"create table %s (\n"
+ "id INT \n"
+ ") with (\n"
+ "'connector' = 'datagen',\n"
+ "'number-of-rows' = '10',\n"
+ "'rows-per-second' = '1',\n"
+ "'fields.id.kind' = 'sequence',\n"
+ "'fields.id.start' = '0',\n"
+ "'fields.id.end' = '100'\n"
+ ")",
"datagenTable"));
The other code basically remains the same. I first try to examinate the records
in the datagen table.
tEnv.executeSql("select `id` from datagenTable")
.print();
The output is as expected, as shown below.
{+}---{-}{-}{+}------------+
|op| id|
{+}---{-}{-}{+}------------+
|+I| 0|
|+I| 1|
|+I| 2|
|+I| 3|
|+I| 7|
|+I| 6|
|+I| 5|
|+I| 4| |
|+I| 9|
|+I| 8| |
The key output log of the main program is shown below.
20:41:24,205 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [49] and num 1.
20:41:24,207 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [48] and num 0. +++ DEBUG +++ duplicated ?
20:41:24,207 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [48] and num 0. +++ DEBUG +++ duplicated ?
1> +I[[48], 0]
20:41:24,233 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [50] and num 2. +++ DEBUG +++ duplicated ?
20:41:24,233 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [50] and num 2. +++ DEBUG +++ duplicated ?
3> +I[[50], 2]
20:41:24,233 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [51] and num 3.
20:41:25,204 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [52] and num 4. +++ DEBUG +++ duplicated ?
20:41:25,204 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [52] and num 4. +++ DEBUG +++ duplicated ?
1> +I[[52], 4]
20:41:25,204 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [53] and num 5.
20:41:25,232 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [55] and num 7.
20:41:25,233 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [54] and num 6. +++ DEBUG +++ duplicated ?
20:41:25,233 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [54] and num 6. +++ DEBUG +++ duplicated ?
3> +I[[54], 6]
20:41:26,204 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [56] and num 8. +++ DEBUG +++ duplicated ?
20:41:26,204 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [56] and num 8. +++ DEBUG +++ duplicated ?
20:41:26,204 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [57] and num 9.
1> +I[[56], 8]
The results in print table are as expected. But the logging shows that rows
which met the condition(that is the not *null* result value) might processed
twice.
Since I'm not familar with the flink udf internals, I can't find the cause of
this issue.
was (Author: rovo98):
Hi, Just for information. I modify the program provided by [~yanxinyi] and
change the generator kind of field `id` from {color:#172b4d}*random*{color}
(default) to *sequence.* As the following, datagen table DDL,
tEnv.executeSql(
String.format(
"create table %s (\n"
+ "id INT \n"
+ ") with (\n"
+ "'connector' = 'datagen',\n"
+ "'number-of-rows' = '10',\n"
+ "'rows-per-second' = '1',\n"
+ "'fields.id.kind' = 'sequence',\n"
+ "'fields.id.start' = '0',\n"
+ "'fields.id.end' = '100'\n"
+ ")",
"datagenTable"));
The other code basically remains the same. I first try to examinate the records
in the datagen table.
tEnv.executeSql("select `id` from datagenTable")
.print();
The output is as expected, as shown below.
+----+-------------+
| op | id |
+----+-------------+
| +I | 0 |
| +I | 1 |
| +I | 2 |
| +I | 3 |
| +I | 7 |
| +I | 6 |
| +I | 5 |
| +I | 4 |
| +I | 9 |
| +I | 8 |
The key output log of the main program is shown below.
20:41:24,205 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [49] and num 1.
20:41:24,207 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [48] and num 0. +++ DEBUG +++ duplicated ?
20:41:24,207 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [48] and num 0. +++ DEBUG +++ duplicated ?
1> +I[[48], 0]
20:41:24,233 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [50] and num 2. +++ DEBUG +++ duplicated ?
20:41:24,233 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [50] and num 2. +++ DEBUG +++ duplicated ?
3> +I[[50], 2]
20:41:24,233 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [51] and num 3.
20:41:25,204 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [52] and num 4. +++ DEBUG +++ duplicated ?
20:41:25,204 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [52] and num 4. +++ DEBUG +++ duplicated ?
1> +I[[52], 4]
20:41:25,204 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [53] and num 5.
20:41:25,232 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [55] and num 7.
20:41:25,233 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [54] and num 6. +++ DEBUG +++ duplicated ?
20:41:25,233 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [54] and num 6. +++ DEBUG +++ duplicated ?
3> +I[[54], 6]
20:41:26,204 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [56] and num 8. +++ DEBUG +++ duplicated ?
20:41:26,204 INFO osp.udf.IntInputUdf
[] - ### ### input bytes [56] and num 8. +++ DEBUG +++ duplicated ?
20:41:26,204 INFO osp.udf.IntInputUdf
[] - *** *** input bytes [57] and num 9.
1> +I[[56], 8]
The results in print table are as expected. But the logging shows that rows
which met the condition(that is the not *null* result value) might processed
twice.
Since I'm not familar the flink udf internals, I can't find the cause of this
issue.
> UDF randomly processed input data twice
> ----------------------------------------
>
> Key: FLINK-29855
> URL: https://issues.apache.org/jira/browse/FLINK-29855
> Project: Flink
> Issue Type: Bug
> Affects Versions: 1.14.4
> Reporter: Xinyi Yan
> Priority: Critical
> Attachments: IntInputUdf.java, SpendReport.java, example.log
>
>
> Local flink cluster env:
> 1 task manager and 1 task slot.
> To reproduce the issue:
> # create a datagen table with a single column int type of id with 1 row per
> second.
> # create a UDF that only mod input data with logging statements.
> # create a print table that prints the results.
> # insert data into the print table with UDF(input id column) execution from
> the datagen table.
> The logging shows that some of the data have been processed twice, which is
> not expected I guess? This will totally change the behavior of the UDF if the
> data has been processed twice. I also attached main and UDF classes, as well
> as the logging file for additional info.
>
> DDL
>
> {code:java}
> public static void main(String[] args) throws Exception {
> EnvironmentSettings settings =
> EnvironmentSettings.newInstance().build();
>
> TableEnvironment tEnv = TableEnvironment.create(settings);
>
> tEnv.executeSql("CREATE FUNCTION IntInputUdf AS
> 'org.apache.flink.playgrounds.spendreport.IntInputUdf'");
> tEnv.executeSql("CREATE TABLE datagenTable (\n" +
> " id INT\n" +
> ") WITH (\n" +
> " 'connector' = 'datagen',\n" +
> " 'number-of-rows' = '100',\n" +
> " 'rows-per-second' = '1'\n" +
> ")");
> tEnv.executeSql("CREATE TABLE print_table (\n" +
> " id_in_bytes VARBINARY,\n" +
> " id INT\n" +
> ") WITH (\n" +
> " 'connector' = 'print'\n" +
> ")");
> tEnv.executeSql("INSERT INTO print_table SELECT * FROM ( SELECT
> IntInputUdf(`id`) AS `id_in_bytes`, `id` FROM datagenTable ) AS ET WHERE
> ET.`id_in_bytes` IS NOT NULL");
> } {code}
>
> UDF
>
> {code:java}
> public @DataTypeHint("Bytes") byte[] eval(@DataTypeHint("INT") Integer
> intputNum) {
> byte[] results = intputNum.toString().getBytes(StandardCharsets.UTF_8);
> if (intputNum % 2 == 0) {
> LOG.info("### ### input bytes {} and num {}. ### ### DEBUG ### ###
> duplicated call??? ### DEBUG ### ### ", results, intputNum);
> return results;
> }
> LOG.info("*** *** input bytes {} and num {}.", results, intputNum);
> return null;
> } {code}
> output
>
>
> {code:java}
> 2022-11-02 13:38:56,765 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:56,766 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 51, 50, 52, 56, 51, 54, 53, 48, 50] and num -1324836502.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:57,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:57,763 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [49, 48, 56, 53, 52, 53, 54, 53, 52, 50] and num 1085456542.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:58,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:58,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [49, 53, 48, 54, 51, 49, 49, 57, 53, 52] and num 1506311954.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:38:59,759 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 49, 56, 48, 48, 54, 57, 48, 52, 51, 55] and num -1800690437.
> 2022-11-02 13:39:00,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [49, 52, 50, 56, 56, 55, 55, 52, 56, 51] and num 1428877483.
> 2022-11-02 13:39:01,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:01,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 55, 57, 52, 50, 54, 51, 54, 56, 54] and num -1794263686.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:02,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:02,762 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 49, 54, 54, 56, 57, 56, 53, 52, 50] and num -1166898542.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:03,758 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 49, 54, 54, 51, 53, 49, 53, 55, 53, 51] and num -1663515753.
> 2022-11-02 13:39:04,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 52, 53, 53, 51, 52, 52, 50, 57] and num -45534429.
> 2022-11-02 13:39:05,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [49, 50, 55, 48, 55, 50, 52, 52, 57] and num 127072449.
> 2022-11-02 13:39:06,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 52, 53, 51, 55, 48, 53, 54, 48, 55] and num -453705607.
> 2022-11-02 13:39:07,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:07,763 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - ### ###
> input bytes [45, 49, 48, 57, 53, 57, 48, 56, 51, 50, 54] and num -1095908326.
> ### ### DEBUG ### ### duplicated call??? ### DEBUG ### ###
> 2022-11-02 13:39:08,760 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 49, 54, 50, 55, 53, 57, 55, 52, 49, 55] and num -1627597417.
> 2022-11-02 13:39:09,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [53, 57, 54, 53, 50, 48, 53, 48, 49] and num 596520501.
> 2022-11-02 13:39:10,761 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [49, 51, 54, 49, 49, 54, 50, 56, 52, 51] and num 1361162843.
> 2022-11-02 13:39:11,759 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [50, 48, 52, 56, 48, 53, 49, 55, 57, 49] and num 2048051791.
> 2022-11-02 13:39:12,759 INFO
> org.apache.flink.playgrounds.spendreport.IntInputUdf [] - *** ***
> input bytes [45, 51, 48, 54, 54, 48, 51, 56, 51, 53] and num -306603835.
> {code}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)