[ 
https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15617827#comment-15617827
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4576 at 10/29/16 9:46 AM:
----------------------------------------------------------------------

Hi [~aljoscha],

Thanks a lot for the pointers, they answer some of my doubts. I was thinking 
hard about this, as well as finishing off some other tasks, hence the late 
reply.
So, if I understand you correctly, the idea is that the current global 
watermark has to be determined from watermarks collected from the whole 
topology, and not only the operators within the chain of the source task, 
correct?

Concerning new interfaces:
I think {{currentWatermark()}} is only needed for all operators in general 
which users shouldn't need to implement and should only be internally used, but 
{{notifyCurrentGlobalWatermark()}} is needed by the actual source operator user 
implementations (so that source UDFs can determine whether or not they need to 
emit the global watermark). So I am leaning towards separate interfaces for 
this (perhaps an internal {{WatermarkOperator}} interface and a public 
{{GlobalWatermarkListener}} interface), or am I missing anything in the picture?

Concerning the global watermark collection: at what interval do you think the 
operators need be triggered to return their current watermarks, and does it 
make sense to let this interval be user configurable? Also, I'm not sure if it 
makes sense to allow multiple concurrent in-progress watermark collections 
(trigger another watermark collection from the operators even if the we haven't 
fully determined the last global watermark). 

Giving a second go at this task next week!


was (Author: tzulitai):
Hi [~aljoscha],

Thanks a lot for the pointers, they answer some of my doubts. I was thinking 
hard about this, as well as finishing off some other tasks, hence the late 
reply.
So, if I understand you correctly, the idea is that the current global 
watermark has to be determined from watermarks collected from the whole 
topology, and not only the operators within the chain of the source task, 
correct?

Concerning new interfaces:
I think {{currentWatermark()}} is only needed for all operators in general 
which users shouldn't need to implement and should only be internally used, but 
{{notifyCurrentGlobalWatermark()}} is needed by the actual source operator user 
implementations (so that source UDFs can determine whether or not they need to 
emit the global watermark). So I am leaning towards separate interfaces for 
this (perhaps an internal {{WatermarkOperator}} interface and a public 
{{GlobalWatermarkListener}} interface), or am I missing anything in the picture?

Giving a second go at this task next week!

> Low Watermark Service in JobManager for Streaming Sources
> ---------------------------------------------------------
>
>                 Key: FLINK-4576
>                 URL: https://issues.apache.org/jira/browse/FLINK-4576
>             Project: Flink
>          Issue Type: New Feature
>          Components: JobManager, Streaming, TaskManager
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a 
> low watermark service in the JobManager to support transparent resharding / 
> partition discovery for our Kafka and Kinesis consumers (and any future 
> streaming connectors in general for which the external system may elastically 
> scale up and down independently of the parallelism of sources in Flink). The 
> main idea is to let source subtasks that don't emit their own watermarks 
> (because they currently don't have data partitions to consume) emit the low 
> watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE 
> watermark and forbidding them to be assigned partitions in the future.
> The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} 
> will be added to execution graphs, periodically triggering only the source 
> vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the 
> JobManager through the actor gateway (or a new interface after FLINK-4456 
> gets merged) with a {{ReplyLowWatermark}} message. When the coordinator 
> collects all low watermarks for a particular source vertex and determines the 
> aggregated low watermark for this round (accounting only values that are 
> larger than the aggregated low watermark of the last round), it sends a 
> {{NotifyNewLowWatermark}} message to the source vertex's tasks.
> The messages will only be relevant to tasks that implement an internal 
> {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
> should implement {{LowWatermarkCooperatingTask}}.
> Source functions should implement a public {{LowWatermarkListener}} interface 
> if they wish to get notified of the aggregated low watermarks across 
> subtasks. Connectors like the Kinesis consumer can choose to emit this 
> watermark if the subtask currently does not have any shards, so that 
> downstream operators may still properly advance time windows (implementation 
> for this is tracked as a separate issue).
> Overall, the service will include -
> New messages between JobManager <-> TaskManager:
> {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
> {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
> {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, 
> timestamp)}}
> New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
> New public interface {{LowWatermarkListener}} in flink-streaming-java
> Might also need to extend {{SourceFunction.SourceContext}} to support 
> retrieving the current low watermark of sources.
> Any feedback for this is appreciated!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to