Dian Fu created FLINK-28253:
-------------------------------
Summary: LocalDateTime is not supported in PyFlink
Key: FLINK-28253
URL: https://issues.apache.org/jira/browse/FLINK-28253
Project: Flink
Issue Type: Bug
Components: API / Python
Affects Versions: 1.15.0, 1.14.0
Reporter: Dian Fu
For the following job:
{code}
from pyflink.datastream.stream_execution_environment import
StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings
from pyflink.table.table_environment import StreamTableEnvironment
if __name__ == '__main__':
env = StreamExecutionEnvironment.get_execution_environment()
settings = EnvironmentSettings.new_instance() \
.in_streaming_mode() \
.build()
t_env = StreamTableEnvironment.create(stream_execution_environment=env,
environment_settings=settings)
t_env.execute_sql("""
CREATE TABLE events (
`id` VARCHAR,
`source` VARCHAR,
`resources` VARCHAR,
`time` TIMESTAMP(3),
WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
PRIMARY KEY (`id`) NOT ENFORCED
) WITH (
'connector' = 'filesystem',
'path' = 'file:///path/to/input',
'format' = 'csv'
)
""")
events_stream_table = t_env.from_path('events')
events_stream = t_env.to_data_stream(events_stream_table)
# Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(),
Types.SQL_TIMESTAMP()])
# now do some processing - let's filter by the type of event we get
codebuild_stream = events_stream.filter(
lambda event: event['source'] == 'aws.codebuild'
)
codebuild_stream.print()
env.execute()
{code}
It will reports the following exception:
{code}
Traceback (most recent call last):
File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py",
line 47, in <module>
process()
File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py",
line 39, in process
lambda event: event['source'] == 'aws.codebuild'
File
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py",
line 432, in filter
self._j_data_stream.getTransformation().getOutputType())
File
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
line 1070, in _from_java_type
TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())))
File
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
line 1042, in _from_java_type
j_row_field_types]
File
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
line 1041, in <listcomp>
row_field_types = [_from_java_type(j_row_field_type) for j_row_field_type in
File
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
line 1072, in _from_java_type
raise TypeError("The java type info: %s is not supported in PyFlink
currently." % j_type_info)
TypeError: The java type info: LocalDateTime is not supported in PyFlink
currently.
{code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)