[ 
https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15635107#comment-15635107
 ] 

Tzu-Li (Gordon) Tai commented on FLINK-4576:
--------------------------------------------

I think the general problem with a central watermark service to coordinate the 
global watermark of an operator's instances is that its breaking the nature 
that "event watermarks are derived from the data itself".

Addressing [~aljoscha]'s concerns:
I think adding also a "watermark reactivate" message that sources also send 
should address some of the issues. The purpose of the "watermark reactivate" 
message is to avoid naively determining elements that arrive after the 
"watermark reactivate" message as late.

- For subtasks with partitions that actually don't produce new elements for a 
while, I think the source's itself should implement a threshold that when 
reached, a "watermark idle" message should be emitted downstream. As soon as 
the source subtask detects it will start producing data again, it should emit a 
"watermark reactivate" message. Actual watermarks will as a result be produced 
again from the data of that source subtask (or timestamp extractor/watermark 
assigners that read from that source subtask).
- In the case that subtasks initially don't have partitions and get assigned 
one afterwards, it should behave likewise the above description.
- Concerning rogue timestamp extractor/watermark assigners in the middle of a 
topology that cause some instances to stop producing watermarks infinitely: 
sticking to the nature that "event watermarks are derived from the data 
itself", I think it's the user implementation's responsibility that this 
shouldn't happen. As long as there are data flowing through the timestamp 
extractor/watermark assigner instance, the implementation should guarantee that 
watermarks make progress. In the case where no data is flowing through the 
extractor/watermark assigner instance, this should be internally taken care of 
by the "watermark idle" / "watermark reactivate" messages sent from the sources.

What do you think?

> Low Watermark Service in JobManager for Streaming Sources
> ---------------------------------------------------------
>
>                 Key: FLINK-4576
>                 URL: https://issues.apache.org/jira/browse/FLINK-4576
>             Project: Flink
>          Issue Type: New Feature
>          Components: JobManager, Streaming, TaskManager
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a 
> low watermark service in the JobManager to support transparent resharding / 
> partition discovery for our Kafka and Kinesis consumers (and any future 
> streaming connectors in general for which the external system may elastically 
> scale up and down independently of the parallelism of sources in Flink). The 
> main idea is to let source subtasks that don't emit their own watermarks 
> (because they currently don't have data partitions to consume) emit the low 
> watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE 
> watermark and forbidding them to be assigned partitions in the future.
> The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} 
> will be added to execution graphs, periodically triggering only the source 
> vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the 
> JobManager through the actor gateway (or a new interface after FLINK-4456 
> gets merged) with a {{ReplyLowWatermark}} message. When the coordinator 
> collects all low watermarks for a particular source vertex and determines the 
> aggregated low watermark for this round (accounting only values that are 
> larger than the aggregated low watermark of the last round), it sends a 
> {{NotifyNewLowWatermark}} message to the source vertex's tasks.
> The messages will only be relevant to tasks that implement an internal 
> {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
> should implement {{LowWatermarkCooperatingTask}}.
> Source functions should implement a public {{LowWatermarkListener}} interface 
> if they wish to get notified of the aggregated low watermarks across 
> subtasks. Connectors like the Kinesis consumer can choose to emit this 
> watermark if the subtask currently does not have any shards, so that 
> downstream operators may still properly advance time windows (implementation 
> for this is tracked as a separate issue).
> Overall, the service will include -
> New messages between JobManager <-> TaskManager:
> {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
> {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
> {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, 
> timestamp)}}
> New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
> New public interface {{LowWatermarkListener}} in flink-streaming-java
> Might also need to extend {{SourceFunction.SourceContext}} to support 
> retrieving the current low watermark of sources.
> Any feedback for this is appreciated!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to