[
https://issues.apache.org/jira/browse/FLINK-17093?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Hequn Cheng closed FLINK-17093.
-------------------------------
Resolution: Fixed
> Python UDF doesn't work when the input column is from composite field
> ---------------------------------------------------------------------
>
> Key: FLINK-17093
> URL: https://issues.apache.org/jira/browse/FLINK-17093
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 1.10.0
> Reporter: Dian Fu
> Assignee: Dian Fu
> Priority: Blocker
> Labels: pull-request-available
> Fix For: 1.10.1, 1.11.0
>
> Time Spent: 40m
> Remaining Estimate: 0h
>
> For the following job:
> {code}
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.table import BatchTableEnvironment, StreamTableEnvironment,
> EnvironmentSettings, CsvTableSink
> from pyflink.table.descriptors import Schema, Kafka, Json
> from pyflink.table import DataTypes
> from pyflink.table.udf import ScalarFunction, udf
> import os
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()],
> result_type=DataTypes.STRING())
> def get_host_ip(source, qr, sip, dip):
> if source == "NGAF" and qr == '1':
> return dip
> return sip
> @udf(input_types=[DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING()],
> result_type=DataTypes.STRING())
> def get_dns_server_ip(source, qr, sip, dip):
> if source == "NGAF" and qr == '1':
> return sip
> return dip
> def test_case():
> env = StreamExecutionEnvironment.get_execution_environment()
> env.set_parallelism(1)
> t_env = StreamTableEnvironment.create(env)
> from pyflink.table import Row
> table = t_env.from_elements(
> [("DNS", Row(source="source", devid="devid", sip="sip", dip="dip",
> qr="qr", queries="queries", answers="answers", qtypes="qtypes",
> atypes="atypes", rcode="rcode", ts="ts",))],
> DataTypes.ROW([DataTypes.FIELD("stype", DataTypes.STRING()),
> DataTypes.FIELD("data",
> DataTypes.ROW([DataTypes.FIELD('source', DataTypes.STRING()),
> DataTypes.FIELD("devid", DataTypes.STRING()),
> DataTypes.FIELD('sip', DataTypes.STRING()),
> DataTypes.FIELD('dip', DataTypes.STRING()),
> DataTypes.FIELD("qr", DataTypes.STRING()),
> DataTypes.FIELD("queries", DataTypes.STRING()),
> DataTypes.FIELD("answers", DataTypes.STRING()),
> DataTypes.FIELD("qtypes", DataTypes.STRING()),
> DataTypes.FIELD("atypes", DataTypes.STRING()),
> DataTypes.FIELD("rcode", DataTypes.STRING()),
> DataTypes.FIELD("ts", DataTypes.STRING())]))
> ]
> ))
> result_file = "/tmp/test.csv"
> if os.path.exists(result_file):
> os.remove(result_file)
> t_env.register_table_sink("Results",
> CsvTableSink(['a', 'b', 'c', 'd', 'e', 'f', 'g', 'h', 'i', 'j', 'k', 'l',
> 'm', 'n'],
> [DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING(),
> DataTypes.STRING(),
> DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING(),
> DataTypes.STRING(), DataTypes.STRING(),
> DataTypes.STRING(), DataTypes.STRING(), DataTypes.STRING()],
> "/tmp/test.csv"))
> t_env.register_function("get_host_ip", get_host_ip)
> t_env.register_function("get_dns_server_ip", get_dns_server_ip)
> t_env.register_table("source", table)
> standard_table = t_env.sql_query("select data.*, stype as dns_type from
> source")\
> .where("dns_type.in('DNSFULL', 'DNS', 'DNSFULL_FROM_LOG', 'DNS_FROM_LOG')")
> t_env.register_table("standard_table", standard_table)
> final_table = t_env.sql_query("SELECT *, get_host_ip(source, qr, sip, dip)
> as host_ip,"
> "get_dns_server_ip(source, qr, sip, dip) as dns_server_ip FROM
> standard_table")
> final_table.insert_into("Results")
> t_env.execute("test")
> if __name__ == '__main__':
> test_case()
> {code}
> The plan is as following which is not correct:
> {code}
> org.apache.flink.runtime.executiongraph.ExecutionGraph - Source:
> KafkaTableSource(type, data) -> Map -> where: (IN(type, _UTF-16LE'DNSFULL',
> _UTF-16LE'DNS', _UTF-16LE'DNSFULL_FROM_LOG', _UTF-16LE'DNS_FROM_LOG')),
> select: (data, type) -> select: (type, get_host_ip(type.source, type.qr,
> type.sip, type.dip) AS f0, get_dns_server_ip(type.source, type.qr, type.sip,
> type.dip) AS f1) -> select: (f0.source AS source, f0.devid AS devid, f0.sip
> AS sip, f0.dip AS dip, f0.qr AS qr, f0.queries AS queries, f0.answers AS
> answers, f0.qtypes AS qtypes, f0.atypes AS atypes, f0.rcode AS rcode, f0.ts
> AS ts, type AS dns_type, f0 AS host_ip, f1 AS dns_server_ip) -> to: Row ->
> Sink: KafkaTableSink(source, devid, sip, dip, qr, queries, answers, qtypes,
> atypes, rcode, ts, dns_type, host_ip, dns_server_ip) (1/4)
> (8d064ab137866a2a9040392a87bcc59d) switched from RUNNING to FAILED.
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)