[
https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Tzu-Li (Gordon) Tai closed FLINK-4576.
--------------------------------------
Resolution: Won't Fix
> Low Watermark Service in JobManager for Streaming Sources
> ---------------------------------------------------------
>
> Key: FLINK-4576
> URL: https://issues.apache.org/jira/browse/FLINK-4576
> Project: Flink
> Issue Type: New Feature
> Components: JobManager, Streaming, TaskManager
> Reporter: Tzu-Li (Gordon) Tai
> Assignee: Tzu-Li (Gordon) Tai
> Priority: Blocker
> Fix For: 1.2.0
>
>
> As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a
> low watermark service in the JobManager to support transparent resharding /
> partition discovery for our Kafka and Kinesis consumers (and any future
> streaming connectors in general for which the external system may elastically
> scale up and down independently of the parallelism of sources in Flink). The
> main idea is to let source subtasks that don't emit their own watermarks
> (because they currently don't have data partitions to consume) emit the low
> watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE
> watermark and forbidding them to be assigned partitions in the future.
> The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}}
> will be added to execution graphs, periodically triggering only the source
> vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the
> JobManager through the actor gateway (or a new interface after FLINK-4456
> gets merged) with a {{ReplyLowWatermark}} message. When the coordinator
> collects all low watermarks for a particular source vertex and determines the
> aggregated low watermark for this round (accounting only values that are
> larger than the aggregated low watermark of the last round), it sends a
> {{NotifyNewLowWatermark}} message to the source vertex's tasks.
> The messages will only be relevant to tasks that implement an internal
> {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}}
> should implement {{LowWatermarkCooperatingTask}}.
> Source functions should implement a public {{LowWatermarkListener}} interface
> if they wish to get notified of the aggregated low watermarks across
> subtasks. Connectors like the Kinesis consumer can choose to emit this
> watermark if the subtask currently does not have any shards, so that
> downstream operators may still properly advance time windows (implementation
> for this is tracked as a separate issue).
> Overall, the service will include -
> New messages between JobManager <-> TaskManager:
> {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
> {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
> {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark,
> timestamp)}}
> New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
> New public interface {{LowWatermarkListener}} in flink-streaming-java
> Might also need to extend {{SourceFunction.SourceContext}} to support
> retrieving the current low watermark of sources.
> Any feedback for this is appreciated!
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)