[
https://issues.apache.org/jira/browse/FLINK-39725?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
ASF GitHub Bot updated FLINK-39725:
-----------------------------------
Labels: pull-request-available (was: )
> from_source does not use timestamp assigner in pyflink
> ------------------------------------------------------
>
> Key: FLINK-39725
> URL: https://issues.apache.org/jira/browse/FLINK-39725
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 2.2.0
> Reporter: Andrew Feng
> Priority: Major
> Labels: pull-request-available
> Attachments: from_source.tar.gz
>
>
> See main.py:
>
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors.file_system import FileSource
> from pyflink.common import WatermarkStrategy
> from pyflink.common import Row, Duration
> from pyflink.common.watermark_strategy import TimestampAssigner
> from pyflink.table.types import DataTypes
> env = StreamExecutionEnvironment.get_execution_environment()
> input_path = "./input.csv"
> class DateTimeAssigner(TimestampAssigner):
> def extract_timestamp(self, value: Row, record_timestamp: int) -> int:
> exit(99) # this line does not execute
> return int(datetime.strptime(value.date_time, "%Y-%m-%d
> %H:%M:%S").timestamp() * 1000) # epoch millis
>
> schema = CsvSchema.builder() \
> .add_string_column("date_time") \
> .add_string_column("type") \
> .add_number_column("value", number_type=DataTypes.DOUBLE()) \
> .build()
> file_source = (
> FileSource
> .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
> .build()
> )
> ds = env.from_source(
> file_source,
> WatermarkStrategy
> .for_bounded_out_of_orderness(Duration.of_seconds(2))
> .with_timestamp_assigner(DateTimeAssigner()),
> "csv_source")
> ds.print()
> env.execute()
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)