[
https://issues.apache.org/jira/browse/FLINK-32040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Dian Fu updated FLINK-32040:
----------------------------
Priority: Major (was: Blocker)
> The WatermarkStrategy defined with the Function(with_idleness) report an error
> ------------------------------------------------------------------------------
>
> Key: FLINK-32040
> URL: https://issues.apache.org/jira/browse/FLINK-32040
> Project: Flink
> Issue Type: Bug
> Components: API / Python
> Reporter: Joekwal
> Priority: Major
>
> *version:* upgrade pyflink1.15.2 to pyflink1.16.1
>
> *Report an error:*
> Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time
> characteristic set to 'ProcessingTime', or did you forget to call
> 'data_stream.assign_timestamps_and_watermarks(...)'?
> The application before with version 1.15.2 has never reported the error.
>
> *Example:*
> {code:java}
> ```python```
> class MyTimestampAssigner(TimestampAssigner):
> def extract_timestamp(self, value, record_timestamp: int) -> int:
> return value['version']
> sql="""
> select columns,version(milliseconds) from kafka_source
> """
> table = st_env.sql_query(sql)
> stream = st_env.to_changelog_stream(table)
> stream = stream.assign_timestamps_and_watermarks(
> WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1))
>
> .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10)))
> stream = stream.key_by(CommonKeySelector()) \
> .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \
> .process(WindowFunction(), typeInfo){code}
>
> Try to debug to trace
> ??pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks??
> and find ??watermark_strategy._timestamp_assigner?? is none.
> *Solution:*
> Remove the function ??with_idleness(Duration.of_seconds(10))??
> {code:java}
> stream = stream.assign_timestamps_and_watermarks(
> WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1))
> .with_timestamp_assigner(MyTimestampAssigner())) {code}
> Is this a bug?
--
This message was sent by Atlassian Jira
(v8.20.10#820010)