Andrew Feng created FLINK-39725:
-----------------------------------

             Summary: from_source does not use timestamp assigner in pyflink
                 Key: FLINK-39725
                 URL: https://issues.apache.org/jira/browse/FLINK-39725
             Project: Flink
          Issue Type: Bug
          Components: API / Python
    Affects Versions: 2.2.0
            Reporter: Andrew Feng
         Attachments: from_source.tar.gz

See main.py:

 
{code:python}
from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors.file_system import FileSource
from pyflink.common import WatermarkStrategy
from pyflink.common import Row, Duration
from pyflink.common.watermark_strategy import TimestampAssigner
from pyflink.table.types import DataTypes

env = StreamExecutionEnvironment.get_execution_environment()

input_path = "./input.csv"

class DateTimeAssigner(TimestampAssigner):
    def extract_timestamp(self, value: Row, record_timestamp: int) -> int:
        exit(99) # this line does not execute
        return int(datetime.strptime(value.date_time, "%Y-%m-%d 
%H:%M:%S").timestamp() * 1000)  # epoch millis

    
schema = CsvSchema.builder() \
    .add_string_column("date_time") \
    .add_string_column("type") \
    .add_number_column("value", number_type=DataTypes.DOUBLE()) \
    .build()

file_source = (
    FileSource
    .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
    .build()
)

ds = env.from_source(
    file_source,
    WatermarkStrategy
    .for_bounded_out_of_orderness(Duration.of_seconds(2))
    .with_timestamp_assigner(DateTimeAssigner()),
    "csv_source")

ds.print()
env.execute()

{code}



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to