[ 
https://issues.apache.org/jira/browse/FLINK-39725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

ASF GitHub Bot updated FLINK-39725:
-----------------------------------
    Labels: pull-request-available  (was: )

> from_source does not use timestamp assigner in pyflink
> ------------------------------------------------------
>
>                 Key: FLINK-39725
>                 URL: https://issues.apache.org/jira/browse/FLINK-39725
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 2.2.0
>            Reporter: Andrew Feng
>            Priority: Major
>              Labels: pull-request-available
>         Attachments: from_source.tar.gz
>
>
> See main.py:
>  
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors.file_system import FileSource
> from pyflink.common import WatermarkStrategy
> from pyflink.common import Row, Duration
> from pyflink.common.watermark_strategy import TimestampAssigner
> from pyflink.table.types import DataTypes
> env = StreamExecutionEnvironment.get_execution_environment()
> input_path = "./input.csv"
> class DateTimeAssigner(TimestampAssigner):
>     def extract_timestamp(self, value: Row, record_timestamp: int) -> int:
>         exit(99) # this line does not execute
>         return int(datetime.strptime(value.date_time, "%Y-%m-%d 
> %H:%M:%S").timestamp() * 1000)  # epoch millis
>     
> schema = CsvSchema.builder() \
>     .add_string_column("date_time") \
>     .add_string_column("type") \
>     .add_number_column("value", number_type=DataTypes.DOUBLE()) \
>     .build()
> file_source = (
>     FileSource
>     .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
>     .build()
> )
> ds = env.from_source(
>     file_source,
>     WatermarkStrategy
>     .for_bounded_out_of_orderness(Duration.of_seconds(2))
>     .with_timestamp_assigner(DateTimeAssigner()),
>     "csv_source")
> ds.print()
> env.execute()
> {code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to