Dian Fu created FLINK-28253:
-------------------------------

             Summary: LocalDateTime is not supported in PyFlink
                 Key: FLINK-28253
                 URL: https://issues.apache.org/jira/browse/FLINK-28253
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 1.15.0, 1.14.0
            Reporter: Dian Fu


For the following job:
{code}
from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment
from pyflink.table import EnvironmentSettings
from pyflink.table.table_environment import StreamTableEnvironment


if __name__ == '__main__':
    env = StreamExecutionEnvironment.get_execution_environment()
    settings = EnvironmentSettings.new_instance() \
        .in_streaming_mode() \
        .build()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env, 
environment_settings=settings)

    t_env.execute_sql("""
            CREATE TABLE events (
                 `id` VARCHAR,
                 `source` VARCHAR,
                 `resources` VARCHAR,
                 `time` TIMESTAMP(3),
                 WATERMARK FOR `time` as `time` - INTERVAL '30' SECOND,
                 PRIMARY KEY (`id`) NOT ENFORCED
            ) WITH (
                'connector' = 'filesystem',
                 'path' = 'file:///path/to/input',
                 'format' = 'csv'
            )
        """)

    events_stream_table = t_env.from_path('events')

    events_stream = t_env.to_data_stream(events_stream_table)
    #  Types.ROW([Types.STRING(), Types.STRING(), Types.STRING(), 
Types.SQL_TIMESTAMP()])

    # now do some processing - let's filter by the type of event we get

    codebuild_stream = events_stream.filter(
        lambda event: event['source'] == 'aws.codebuild'
    )

    codebuild_stream.print()
    env.execute()
{code}

It will reports the following exception:
{code}
Traceback (most recent call last):
  File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", 
line 47, in <module>
    process()
  File "/Users/dianfu/code/src/workspace/pyflink-examples/tests/test_2.py", 
line 39, in process
    lambda event: event['source'] == 'aws.codebuild'
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/datastream/data_stream.py",
 line 432, in filter
    self._j_data_stream.getTransformation().getOutputType())
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
 line 1070, in _from_java_type
    TypeInfoDataTypeConverter.toLegacyTypeInfo(j_type_info.getDataType())))
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
 line 1042, in _from_java_type
    j_row_field_types]
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
 line 1041, in <listcomp>
    row_field_types = [_from_java_type(j_row_field_type) for j_row_field_type in
  File 
"/Users/dianfu/venv/examples-37/lib/python3.7/site-packages/pyflink/common/typeinfo.py",
 line 1072, in _from_java_type
    raise TypeError("The java type info: %s is not supported in PyFlink 
currently." % j_type_info)
TypeError: The java type info: LocalDateTime is not supported in PyFlink 
currently.
{code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

Reply via email to