[ 
https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15489818#comment-15489818
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4576 at 9/14/16 8:37 AM:
---------------------------------------------------------------------

Hi [~aljoscha], thanks for the help!
I've actually only just started digging in two days ago, but yes the 
implementation description I outlined above is basically following how the 
{{CheckpointCoordinator}} and it's messaging with taskmanagers is implemented.
I hope to get a PR out for this by the end of the week, maybe initially without 
tests so that I can get some early feedback from you :)

One thing I'm not entirely sure of is this part in the description:
"The messages will only be relevant to tasks that implement an internal 
{{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
should implement {{LowWatermarkCooperatingTask}}."
It is sort of following the way checkpoint messages are only relevant to 
{{StatefulTask}} s, but somehow I have a feeling there might be a better layout 
for this. What do you think?


was (Author: tzulitai):
Hi [~aljoscha], thanks for the help!
I've actually only just started digging in two days ago, but yes the 
implementation description I outlined above is basically following how the 
{{CheckpointCoordinator}} and it's messaging with taskmanagers is implemented.
I hope to get a PR out for this by the end of the week, maybe initially without 
tests so that I can get some early feedback from you :)

One thing I'm not entirely sure of is this part in the description:
"The messages will only be relevant to tasks that implement an internal 
{{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
should implement {{LowWatermarkCooperatingTask}}."
It is sort of following the way checkpoint messages are only relevant to 
{{StatefulTask}}s, but somehow I have a feeling there might be a better layout 
for this. What do you think?

> Low Watermark Service in JobManager for Streaming Sources
> ---------------------------------------------------------
>
>                 Key: FLINK-4576
>                 URL: https://issues.apache.org/jira/browse/FLINK-4576
>             Project: Flink
>          Issue Type: New Feature
>          Components: JobManager, Streaming, TaskManager
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a 
> low watermark service in the JobManager to support transparent resharding / 
> partition discovery for our Kafka and Kinesis consumers (and any future 
> streaming connectors in general for which the external system may elastically 
> scale up and down independently of the parallelism of sources in Flink). The 
> main idea is to let source subtasks that don't emit their own watermarks 
> (because they currently don't have data partitions to consume) emit the low 
> watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE 
> watermark and forbidding them to be assigned partitions in the future.
> The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} 
> will be added to execution graphs, periodically triggering only the source 
> vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the 
> JobManager through the actor gateway (or a new interface after FLINK-4456 
> gets merged) with a {{ReplyLowWatermark}} message. When the coordinator 
> collects all low watermarks for a particular source vertex and determines the 
> aggregated low watermark for this round (accounting only values that are 
> larger than the aggregated low watermark of the last round), it sends a 
> {{NotifyNewLowWatermark}} message to the source vertex's tasks.
> The messages will only be relevant to tasks that implement an internal 
> {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
> should implement {{LowWatermarkCooperatingTask}}.
> Source functions should implement a public {{LowWatermarkListener}} interface 
> if they wish to get notified of the aggregated low watermarks across 
> subtasks. Connectors like the Kinesis consumer can choose to emit this 
> watermark if the subtask currently does not have any shards, so that 
> downstream operators may still properly advance time windows (implementation 
> for this is tracked as a separate issue).
> Overall, the service will include -
> New messages between JobManager <-> TaskManager:
> {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
> {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
> {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, 
> timestamp)}}
> New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
> New public interface {{LowWatermarkListener}} in flink-streaming-java
> Might also need to extend {{SourceFunction.SourceContext}} to support 
> retrieving the current low watermark of sources.
> Any feedback for this is appreciated!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to