Dian Fu created FLINK-28253: ------------------------------- Summary: LocalDateTime is not supported in PyFlink Key: FLINK-28253 URL: https://issues.apache.org/jira/browse/FLINK-28253 Project: Flink Issue Type: Bug Components: API / Python Affects Versions: 1.15.0, 1.14.0 Reporter: Dian Fu
For the following job: {code} from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment from pyflink.table import EnvironmentSettings from pyflink.table.table_environment import StreamTableEnvironment if __name__ == '__main__': env = StreamExecutionEnvironment.get_execution_environment() settings = EnvironmentSettings.new_instance() \ .in_streaming_mode() \ .build() t_env = StreamTableEnvironment.create(stream_execution_environment=env, environment_settings=settings) t_env.execute_sql(""" CREATE TABLE events ( `id` VARCHAR, `source` VARCHAR, `resources` VARCHAR, `time` TIMESTAMP(3), WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND, PRIMARY KEY (`id`) NOT ENFORCED ) WITH ( 'connector' = 'filesystem', 'path' = 'file:///path/to/input', 'format' = 'csv' ) """) events_stream_table = t_env.from_path('events') events_stream = t_env.to_data_stream(events_stream_table) # Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), Types.SQL_TIMESTAMP()]) # now do some processing - let's filter by the type of event we get codebuild_stream = events_stream.filter( lambda event: event['source'] == 'aws.codebuild' ) codebuild_stream.print() env.execute() {code} It will reports the following exception: {code} Traceback (most recent call last): File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", line 47, in <module> process() File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", line 39, in process lambda event: event['source'] == 'aws.codebuild' File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py", line 432, in filter self._j_data_stream.getTransformation().getOutputType()) File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1070, in _from_java_type TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType()))) File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1042, in _from_java_type j_row_field_types] File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1041, in <listcomp> row_field_types = [_from_java_type(j_row_field_type) for j_row_field_type in File "/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py", line 1072, in _from_java_type raise TypeError("The java type info: %s is not supported in PyFlink currently." % j_type_info) TypeError: The java type info: LocalDateTime is not supported in PyFlink currently. {code} -- This message was sent by Atlassian Jira (v8.20.7#820007)