Elkhan Dadashov created FLINK-33188:
---------------------------------------
Summary: PyFlink MapState with Types.ROW() throws exception
Key: FLINK-33188
URL: https://issues.apache.org/jira/browse/FLINK-33188
Project: Flink
Issue Type: Bug
Components: API / Python, API / Type Serialization System
Affects Versions: 1.17.1
Reporter: Elkhan Dadashov
I'm trying to use MapState, where the value will be a list of <class
'pyflink.common.types.Row'> type elements.
Wanted to check if anyone else faced the same issue while trying to use
MapState in PyFlink with complex types.
Here is the code:
from pyflink.common import Time
from pyflink.common.typeinfo import Types
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.functions import (
KeyedCoProcessFunction,
KeySelector,
RuntimeContext,
)
from pyflink.datastream.state import (
MapStateDescriptor,
StateTtlConfig,
ValueStateDescriptor,
ListStateDescriptor
)
from pyflink.table import DataTypes, StreamTableEnvironment
class MyKeyedCoProcessFunction(KeyedCoProcessFunction):
def __init__(self):
self.my_map_state = None
def open(self, runtime_context: RuntimeContext):
state_ttl_config = (
StateTtlConfig.new_builder(Time.seconds(1))
.set_update_type(StateTtlConfig.UpdateType.OnReadAndWrite)
.disable_cleanup_in_background()
.build()
)
my_map_state_descriptor = MapStateDescriptor(
"my_map_state",
Types.SQL_TIMESTAMP(),
Types.LIST(Types.ROW([
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.BIG_INT()
]))
)
my_map_state_descriptor.enable_time_to_live(state_ttl_config)
self.my_map_state =
runtime_context.get_map_state(my_map_state_descriptor)
But while running this code, it fails with this exception at job startup (at
runtime_context.get_map_state(my_map_state_descriptor)), even without trying to
add anything to the state.
File "pyflink/fn_execution/beam/beam_operations_fast.pyx", line 249, in
pyflink.fn_execution.beam.beam_operations_fast.StatefulFunctionOperation.__init__
File"pyflink/fn_execution/beam/beam_operations_fast.pyx", line 132, in
pyflink.fn_execution.beam.beam_operations_fast.FunctionOperation.__init__
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
line 127, in open
self.open_func()
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/operations.py",
line 296, in open_func
process_function.open(runtime_context)
File"/tmp/ipykernel_83481/1603226134.py", line 57, in open
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/datastream/process/runtime_context.py",
line 125, in get_map_state
map_coder = from_type_info(state_descriptor.type_info) # type: MapCoder
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 812, in from_type_info
from_type_info(type_info._key_type_info),
from_type_info(type_info._value_type_info))
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 809, in from_type_info
returnGenericArrayCoder(from_type_info(type_info.elem_type))
File"/usr/local/lib64/python3.9/site-packages/pyflink/fn_execution/coders.py",
line 819, in from_type_info
[f for f in type_info.get_field_names()])
File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line
377, in get_field_names
j_field_names = self.get_java_type_info().getFieldNames()
File"/usr/local/lib64/python3.9/site-packages/pyflink/common/typeinfo.py", line
391, in get_java_type_info
j_types_array = get_gateway()\
File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line
62, in get_gateway
_gateway = launch_gateway()
File"/usr/local/lib64/python3.9/site-packages/pyflink/java_gateway.py", line
86, in launch_gateway
raise Exception("It's launching the PythonGatewayServer during Python UDF
execution "
Exception: It's launching the PythonGatewayServer during Python UDF execution
which is unexpected. It usually happens when the job codes are in the top level
of the Python script file and are not enclosed in a `if name == 'main'`
statement.If I switch from Tupes.ROW to Types.TUPLE() it works without any
exception.
This works:
my_map_state_descriptor = MapStateDescriptor(
"my_map_state",
Types.SQL_TIMESTAMP(),
Types.LIST(Types.TUPLE([
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.STRING(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.SQL_TIMESTAMP(),
Types.BIG_INT()
]))
)
--
This message was sent by Atlassian Jira
(v8.20.10#820010)