[ 
https://issues.apache.org/jira/browse/FLINK-5017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15636743#comment-15636743
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-5017:
--------------------------------------------

I had considered this, by defining values "-1" and "-2" as special timestamp 
values that represent idle / active watermark status.
However, I wasn't quite sure that occupying "-1"and "-2" was a good idea,  or 
whether or not it'll interfere other parts of the system / effect any future 
extensions. Otherwise, considering only the purpose at hand stated here, it 
should be possible with a bit of finesse, the only concern might be that it'll 
end up in bad code readability (we basically need to check if watermarks are 
actually these two special values in every place where we work with them).

I think a separate {{WatermarkStatus}} class has another readability advantage 
that we can simply keep a {{currentWatermarkStatus}} variable in operators that 
is set to the last emitted {{WatermarkStatus}}, which naturally reflects 
whether the operator is currently watermark idle or active.

Either way, let me know what you think :) I'm not against extending 
{{Watermark}}, so I'm still open to going for that approach if you think the 
new methods are excessive.

> Introduce WatermarkStatus stream element to allow for temporarily idle 
> streaming sources
> ----------------------------------------------------------------------------------------
>
>                 Key: FLINK-5017
>                 URL: https://issues.apache.org/jira/browse/FLINK-5017
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> A {{WatermarkStatus}} element informs receiving operators whether or not they 
> should continue to expect watermarks from the sending operator. There are 2 
> kinds of status, namely {{IDLE}} and {{ACTIVE}}. Watermark status elements 
> are generated at the sources, and may be propagated through the operators of 
> the topology using {{Output#emitWatermarkStatus(WatermarkStatus)}}.
> Sources and downstream operators should emit either of the status elements 
> once it changes between "watermark-idle" and "watermark-active" states.
> A source is considered "watermark-idle" if it will not emit records for an 
> indefinite amount of time. This is the case, for example, for Flink's Kafka 
> Consumer, where sources might initially have no assigned partitions to read 
> from, or no records can be read from the assigned partitions. Once the source 
> detects that it will resume emitting data, it is considered 
> "watermark-active".
> Downstream operators with multiple inputs (ex. head operators of a 
> {{OneInputStreamTask}} or {{TwoInputStreamTask}}) should not wait for 
> watermarks from an upstream operator that is "watermark-idle" when deciding 
> whether or not to advance the operator's current watermark. When a downstream 
> operator determines that all upstream operators are "watermark-idle" (i.e. 
> when all input channels have received the watermark idle status element), 
> then the operator is considered to also be "watermark-idle", as it will 
> temporarily be unable to advance its own watermark. This is always the case 
> for operators that only read from a single upstream operator. Once an 
> operator is considered "watermark-idle", it should itself forward its idle 
> status to inform downstream operators. The operator is considered to be back 
> to "watermark-active" as soon as at least one of its upstream operators 
> resume to be "watermark-active" (i.e. when at least one input channel 
> receives the watermark active status element), and should also forward its 
> active status to inform downstream operators.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to