Huang Xingbo created FLINK-24860:
------------------------------------
Summary: Fix the wrong position mappings in the Python UDTF
Key: FLINK-24860
URL: https://issues.apache.org/jira/browse/FLINK-24860
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.13.3, 1.12.5
Reporter: Huang Xingbo
Assignee: Huang Xingbo
Fix For: 1.12.6, 1.13.4
The failed example:
{code:python}
@udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
def StoTraceMqSourcePlugUDTF(s: str):
import json
try:
data = json.loads(s)
except Exception as e:
return None
source_code = "trace"
try:
shipment_no = data['shipMentNo']
except Exception as e:
return None
yield source_code, shipment_no
class StoTraceFindNameUDTF(TableFunction):
def eval(self, shipment_no):
yield shipment_no, shipment_no
sto_trace_find_name = udtf(StoTraceFindNameUDTF(),
result_types=[DataTypes.STRING(),
DataTypes.STRING()])
# self.env.set_parallelism(1)
self.t_env.create_temporary_system_function(
"StoTraceMqSourcePlugUDTF", StoTraceMqSourcePlugUDTF)
self.t_env.create_temporary_system_function(
"sto_trace_find_name", sto_trace_find_name
)
source_table = self.t_env.from_elements([(
'{"shipMentNo":"84210186879"}',)],
['biz_context'])
# self.t_env.execute_sql(source_table)
self.t_env.register_table("source_table", source_table)
t = self.t_env.sql_query(
"SELECT biz_context, source_code, shipment_no FROM source_table
LEFT JOIN LATERAL TABLE(StoTraceMqSourcePlugUDTF(biz_context)) as
T(source_code, shipment_no)"
" ON TRUE")
self.t_env.register_table("Table2", t)
t = self.t_env.sql_query(
"SELECT source_code, shipment_no, shipment_name, shipment_type FROM
Table2 LEFT JOIN LATERAL TABLE(sto_trace_find_name(shipment_no)) as
T(shipment_name, shipment_type)"
" ON TRUE"
)
print(t.to_pandas())
{code}
In the failed example, the input arguments of the second Python Table Function
has the wrong positions mapping.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)