[
https://issues.apache.org/jira/browse/FLINK-39725?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18090095#comment-18090095
]
Swati Gupta commented on FLINK-39725:
-------------------------------------
Hey, I spent some time on this before jumping to a PR.
I ran the attached main.py and swapped exit(99) for a quick file-write so I
could actually see whether extract_timestamp runs. It doesn't, job runs fine,
prints everything, but the marker file never shows up. Tried swapping
FileSource for NumberSequenceSource too, same thing happens, so it's not just a
CSV/FileSource quirk.
As a sanity check I tried the older style (from_collection +
assign_timestamps_and_watermarks) and there the assigner works fine.
Digging into the pyflink code, looks like with_timestamp_assigner() just stores
your assigner on the Python object and never actually passes it down to the
Java side. from_source() only forwards the Java watermark strategy, so it never
even looks for it. assign_timestamps_and_watermarks() does check for it though,
which is why that path works.
Before I go any further, couple of things I wanted to ask:
1. Is this expected behavior for some reason I'm not seeing, or does it
genuinely look like a bug to you too?
2. If it's a bug, any preference on how you'd want it fixed — handled inside
from_source() directly, or by reusing the same logic
assign_timestamps_and_watermarks() already has?
> from_source does not use timestamp assigner in pyflink
> ------------------------------------------------------
>
> Key: FLINK-39725
> URL: https://issues.apache.org/jira/browse/FLINK-39725
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Affects Versions: 2.2.0
> Reporter: Andrew Feng
> Priority: Major
> Attachments: from_source.tar.gz
>
>
> See main.py:
>
> {code:python}
> from pyflink.datastream.formats.csv import CsvSchema, CsvReaderFormat
> from pyflink.datastream import StreamExecutionEnvironment
> from pyflink.datastream.connectors.file_system import FileSource
> from pyflink.common import WatermarkStrategy
> from pyflink.common import Row, Duration
> from pyflink.common.watermark_strategy import TimestampAssigner
> from pyflink.table.types import DataTypes
> env = StreamExecutionEnvironment.get_execution_environment()
> input_path = "./input.csv"
> class DateTimeAssigner(TimestampAssigner):
> def extract_timestamp(self, value: Row, record_timestamp: int) -> int:
> exit(99) # this line does not execute
> return int(datetime.strptime(value.date_time, "%Y-%m-%d
> %H:%M:%S").timestamp() * 1000) # epoch millis
>
> schema = CsvSchema.builder() \
> .add_string_column("date_time") \
> .add_string_column("type") \
> .add_number_column("value", number_type=DataTypes.DOUBLE()) \
> .build()
> file_source = (
> FileSource
> .for_record_stream_format(CsvReaderFormat.for_schema(schema), input_path)
> .build()
> )
> ds = env.from_source(
> file_source,
> WatermarkStrategy
> .for_bounded_out_of_orderness(Duration.of_seconds(2))
> .with_timestamp_assigner(DateTimeAssigner()),
> "csv_source")
> ds.print()
> env.execute()
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)