[ 
https://issues.apache.org/jira/browse/FLINK-31373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Zhipeng Zhang updated FLINK-31373:
----------------------------------
    Description: 
Currently we use PerRoundWrapperOperator to wrap the normal flink operators 
such that they can be used in iterations.

We already contained the epoch information in each record so that we know which 
iteration each record belongs to.

However, there is no epoch information when the stream element is a watermark. 
This works in most cases, but fail to address the following use case:
 - In DataStreamUtils#withBroadcast, we will cache the elements (including 
watermarks) from non-broadcast inputs until the broadcast variables are ready. 
When the broadcast variables are ready, once we receive a stream element we 
will process the cached elements first. If the received element is a watermark, 
the current implementation of iteration module fails (ProxyOutput#collect 
throws NPE) since there is no epoch  information.

  was:
Currently we use `PerRoundWrapperOperator` to wrap the normal flink operators 
such that they can be used in iterations.


We already contained the epoch information in each record so that we know which 
iteration each record belongs to.

However, there is no epoch information when the stream element is a watermark. 
This works in most cases, but fail to address the following issue:

- In DataStreamUtils#withBroadcast, we will cache the elements (including 
watermarks) from non-broadcast inputs until the broadcast variables are ready. 
When the broadcast variables are ready, once we receive a stream element we 
will process the cached elements first. If the received element is a watermark, 
the current implementation of iteration module fails (`ProxyOutput#collect` 
throws NPE) since there is no epoch  information.


> PerRoundWrapperOperator should carry epoch information in watermark
> -------------------------------------------------------------------
>
>                 Key: FLINK-31373
>                 URL: https://issues.apache.org/jira/browse/FLINK-31373
>             Project: Flink
>          Issue Type: Bug
>          Components: Library / Machine Learning
>    Affects Versions: ml-2.2.0
>            Reporter: Zhipeng Zhang
>            Priority: Major
>
> Currently we use PerRoundWrapperOperator to wrap the normal flink operators 
> such that they can be used in iterations.
> We already contained the epoch information in each record so that we know 
> which iteration each record belongs to.
> However, there is no epoch information when the stream element is a 
> watermark. This works in most cases, but fail to address the following use 
> case:
>  - In DataStreamUtils#withBroadcast, we will cache the elements (including 
> watermarks) from non-broadcast inputs until the broadcast variables are 
> ready. When the broadcast variables are ready, once we receive a stream 
> element we will process the cached elements first. If the received element is 
> a watermark, the current implementation of iteration module fails 
> (ProxyOutput#collect throws NPE) since there is no epoch  information.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to