[
https://issues.apache.org/jira/browse/FLINK-24860?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-24860:
-----------------------------------
Labels: pull-request-available (was: )
> Fix the wrong position mappings in the Python UDTF
> --------------------------------------------------
>
> Key: FLINK-24860
> URL: https://issues.apache.org/jira/browse/FLINK-24860
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.12.5, 1.13.3
> Reporter: Huang Xingbo
> Assignee: Huang Xingbo
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.12.6, 1.13.4
>
>
> The failed example:
> {code:python}
> @udtf(result_types=[DataTypes.STRING(), DataTypes.STRING()])
> def StoTraceMqSourcePlugUDTF(s: str):
> import json
> try:
> data = json.loads(s)
> except Exception as e:
> return None
> source_code = "trace"
> try:
> shipment_no = data['shipMentNo']
> except Exception as e:
> return None
> yield source_code, shipment_no
> class StoTraceFindNameUDTF(TableFunction):
> def eval(self, shipment_no):
> yield shipment_no, shipment_no
> sto_trace_find_name = udtf(StoTraceFindNameUDTF(),
> result_types=[DataTypes.STRING(),
> DataTypes.STRING()])
> # self.env.set_parallelism(1)
> self.t_env.create_temporary_system_function(
> "StoTraceMqSourcePlugUDTF", StoTraceMqSourcePlugUDTF)
> self.t_env.create_temporary_system_function(
> "sto_trace_find_name", sto_trace_find_name
> )
> source_table = self.t_env.from_elements([(
> '{"shipMentNo":"84210186879"}',)],
> ['biz_context'])
> # self.t_env.execute_sql(source_table)
> self.t_env.register_table("source_table", source_table)
> t = self.t_env.sql_query(
> "SELECT biz_context, source_code, shipment_no FROM source_table
> LEFT JOIN LATERAL TABLE(StoTraceMqSourcePlugUDTF(biz_context)) as
> T(source_code, shipment_no)"
> " ON TRUE")
> self.t_env.register_table("Table2", t)
> t = self.t_env.sql_query(
> "SELECT source_code, shipment_no, shipment_name, shipment_type
> FROM Table2 LEFT JOIN LATERAL TABLE(sto_trace_find_name(shipment_no)) as
> T(shipment_name, shipment_type)"
> " ON TRUE"
> )
> print(t.to_pandas())
> {code}
> In the failed example, the input arguments of the second Python Table
> Function has the wrong positions mapping.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)