Dian Fu created FLINK-34985:
-------------------------------
Summary: It doesn't support to access fields by name for map
function in thread mode
Key: FLINK-34985
URL: https://issues.apache.org/jira/browse/FLINK-34985
Project: Flink
Issue Type: Bug
Components: API / Python
Reporter: Dian Fu
Reported in slack channel:
[https://apache-flink.slack.com/archives/C065944F9M2/p1711640068929589]
```
hi all, I seem to be running into an issue when switching to thread mode in
PyFlink. In an UDF the {{Row}} seems to get converted into a tuple and you
cannot access fields by their name anymore. In process mode it works fine. This
bug can easily be reproduced using this minimal example, which is close to the
PyFlink docs:
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.common import Row
from pyflink.table import StreamTableEnvironment, DataTypes
from pyflink.table.udf import udf
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.get_config().set("parallelism.default", "1")
# This does work:
t_env.get_config().set("python.execution-mode", "process")
# This doesn't work:
#t_env.get_config().set("python.execution-mode", "thread")
def map_function(a: Row) -> Row:
return Row(a.a + 1, a.b * a.b)
# map operation with a python general scalar function
func = udf(
map_function,
result_type=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
table = (
t_env.from_elements(
[(2, 4), (0, 0)],
schema=DataTypes.ROW(
[
DataTypes.FIELD("a", DataTypes.BIGINT()),
DataTypes.FIELD("b", DataTypes.BIGINT()),
]
),
)
.map(func)
.alias("a", "b")
.execute()
.print()
)```
The exception I get in this execution mode is:
2024-03-28 16:32:10 Caused by: pemja.core.PythonException: <class
'AttributeError'>: 'tuple' object has no attribute 'a'
2024-03-28 16:32:10 at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/embedded/operation_utils.process_element(operation_utils.py:72)
2024-03-28 16:32:10 at
/usr/local/lib/python3.10/dist-packages/pyflink/fn_execution/table/operations.process_element(operations.py:102)
2024-03-28 16:32:10 at <string>.<lambda>(<string>:1)
2024-03-28 16:32:10 at
/opt/flink/wouter/minimal_example.map_function(minimal_example.py:19)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)