[
https://issues.apache.org/jira/browse/FLINK-25332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17460506#comment-17460506
]
Dian Fu commented on FLINK-25332:
---------------------------------
Just posting the plan you shared:
{code:java}
LegacySink(name=[`default_catalog`.`default_database`.`algorithmsink`],
fields=[disresult])
+- PythonCalc(select=[writeData(line, calculationStatusMap, gathertime,
rawData, terminal, deviceID, recievetime, car, sendtime, baseInfoMap,
workStatusMap, timestamp) AS dis_result])
+- Calc(select=[line, calculationStatusMap, gathertime, rawData, terminal,
deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, timestamp],
where=[(f0 > 0.0:DECIMAL(2, 1))])
+- PythonCalc(select=[line, calculationStatusMap, gathertime, rawData,
terminal, deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap,
timestamp, writeData(line, calculationStatusMap, gathertime, rawData, terminal,
deviceID, recievetime, car, sendtime, baseInfoMap, workStatusMap, timestamp) AS
f0])
+- LegacyTableSourceScan(table=[[default_catalog, default_database,
mysource, source: [KafkaTableSource(line, calculationStatusMap, gathertime,
rawData, terminal, deviceID, recievetime, car, sendtime, baseInfoMap,
workStatusMap, timestamp)]]], fields=[line, calculationStatusMap, gathertime,
rawData, terminal, deviceID, recievetime, car, sendtime, baseInfoMap,
workStatusMap, timestamp])
{code}
There are two PythonCalc nodes in the plan and so UDF writeData is calculated
multiple times for the same input. It doesn't matter when the UDF is
deterministic. However, your UDF is non-deterministic and so you got unexpected
results.
I think that this is definitely unexpected. cc [~hxbks2ks]
> When using Pyflink Table API, 'where' clause seems to work incorrectly
> ----------------------------------------------------------------------
>
> Key: FLINK-25332
> URL: https://issues.apache.org/jira/browse/FLINK-25332
> Project: Flink
> Issue Type: Bug
> Components: Table SQL / API
> Affects Versions: 1.13.0
> Environment: Python 3.6.9, Pyflink 1.13.0, kafka2.12-2.4.0
> Reporter: TongMeng
> Priority: Major
> Attachments: sql_explain.txt
>
>
> The UDF I used just returns a float, the first four data it returns 1.0, 2.0,
> 3.0 and 4.0, then it returns 0.0. I use 'where' in the sql to filter the 0.0
> result. So the expected result I want to see in the kafka should be 1.0, 2.0,
> 3.0 and 4.0. However kafka consumer gives four 0.0.
> The sql is as follow:
> "insert into algorithmsink select dt.my_result from(select udf1(a) AS
> my_result from mysource) AS dt where dt.my_result > 0.0" (udf1 is my UDF)
> After I removed the 'where dt.my_result > 0.0' part, it workd well. Kafka
> gave 1.0, 2.0, 3.0, 4.0, 0.0, 0.0, 0.0……
--
This message was sent by Atlassian Jira
(v8.20.1#820001)