[ 
https://issues.apache.org/jira/browse/FLINK-32040?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dian Fu updated FLINK-32040:
----------------------------
    Affects Version/s: 1.17.0
                       1.16.0

> The WatermarkStrategy defined with the Function(with_idleness) report an error
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-32040
>                 URL: https://issues.apache.org/jira/browse/FLINK-32040
>             Project: Flink
>          Issue Type: Bug
>          Components: API / Python
>    Affects Versions: 1.16.0, 1.17.0
>            Reporter: Joekwal
>            Priority: Major
>
> *version:* upgrade pyflink1.15.2 to pyflink1.16.1
>  
> *Report an error:*
> Record has Java Long.MIN_VALUE timestamp (= no timestamp marker). Is the time 
> characteristic set to 'ProcessingTime', or did you forget to call 
> 'data_stream.assign_timestamps_and_watermarks(...)'?
> The application before with version 1.15.2 has never reported the error.
>  
> *Example:*
> {code:java}
> ```python```
> class MyTimestampAssigner(TimestampAssigner):    
>    def extract_timestamp(self, value, record_timestamp: int) -> int:        
>        return value['version']
> sql="""
> select columns,version(milliseconds) from kafka_source
> """
> table = st_env.sql_query(sql)
> stream = st_env.to_changelog_stream(table)
> stream = stream.assign_timestamps_and_watermarks(            
> WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1))        
>     
> .with_timestamp_assigner(MyTimestampAssigner()).with_idleness(Duration.of_seconds(10)))
> stream = stream.key_by(CommonKeySelector()) \
>     .window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8))) \
>     .process(WindowFunction(), typeInfo){code}
>  
> Try to debug to trace  
> ??pyflink.datastream.data_stream.DataStream.assign_timestamps_and_watermarks??
>  and find ??watermark_strategy._timestamp_assigner?? is none.
> *Solution:*
> Remove the function ??with_idleness(Duration.of_seconds(10))??
> {code:java}
> stream = stream.assign_timestamps_and_watermarks(
>     WatermarkStrategy.for_bounded_out_of_orderness(Duration.of_minutes(1))
>     .with_timestamp_assigner(MyTimestampAssigner())) {code}
> Is this a bug?



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to