[ 
https://issues.apache.org/jira/browse/FLINK-4576?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15631848#comment-15631848
 ] 

Tzu-Li (Gordon) Tai edited comment on FLINK-4576 at 11/3/16 7:10 AM:
---------------------------------------------------------------------

Want to fall back a bit to analysis of the original problem, to make sure we're 
going for a reasonable solution:

I think the only problem that we were facing is that our own sources, i.e. 
Kafka and Kinesis in particular, may have instances that initially are idle, 
but may eventually still have data to read when partitions / shard counts scale 
up. Before, we were sending max value watermarks from these idle subtasks 
downstream to ensure that downstream operators can still advance their 
watermark.
However, due to FLINK-4022 and FLINK-4341, we don't really want to do that, 
because it breaks the possibility of letting idle subtasks transparently accept 
new partitions / shards to read from without messing up the watermarks 
downstream.

To solve this, I'm considering another approach:
- Have a new special watermark, a subclass of {{Watermark}}, that when a 
downstream operator receive from one of it's input channels, the operator will 
mark that input channel as temporarily "watermark idle", and will not take it 
into account when determining whether or not to advance the timers.
- This special watermark is not passed on downstream. It basically stops at the 
operator that directly receives it afterwards, to let it know that one of it's 
input channels will indefinitely halt to send any watermarks and should not be 
accounted for when advancing watermarks.
- When an input channel that was previously marked as "watermark idle" sends 
out a normal watermark, the behaviour falls back to like before.
- We basically only want sources to be sending this special watermark, and 
shouldn't be used anywhere else.

There is of course the problem that user-assigned watermarks in the middle of 
the topology can still mess up / not emit watermarks, but I think it is 
reasonable to have a contract with user-provided timestamp / watermark 
assigners that their implementations should not cause this.

[~aljoscha] what do you think?


was (Author: tzulitai):
Back to analysis of the original problem:

I think the only problem that we were facing is that our own sources, i.e. 
Kafka and Kinesis in particular, may have instances that initially are idle, 
but may eventually still have data to read when partitions / shard counts scale 
up. Before, we were sending max value watermarks from these idle subtasks 
downstream to ensure that downstream operators can still advance their 
watermark.
However, due to FLINK-4022 and FLINK-4341, we don't really want to do that, 
because it breaks the possibility of letting idle subtasks transparently accept 
new partitions / shards to read from without messing up the watermarks 
downstream.

To solve this, I'm considering another approach:
- Have a new special watermark, a subclass of {{Watermark}}, that when a 
downstream operator receive from one of it's input channels, the operator will 
mark that input channel as temporarily "watermark idle", and will not take it 
into account when determining whether or not to advance the timers.
- This special watermark is not passed on downstream. It basically stops at the 
operator that directly receives it afterwards, to let it know that one of it's 
input channels will indefinitely halt to send any watermarks and should not be 
accounted for when advancing watermarks.
- When an input channel that was previously marked as "watermark idle" sends 
out a normal watermark, the behaviour falls back to like before.
- We basically only want sources to be sending this special watermark, and 
shouldn't be used anywhere else.

There is of course the problem that user-assigned watermarks in the middle of 
the topology can still mess up / not emit watermarks, but I think it is 
reasonable to have a contract with user-provided timestamp / watermark 
assigners that their implementations should not cause this.

[~aljoscha] what do you think?

> Low Watermark Service in JobManager for Streaming Sources
> ---------------------------------------------------------
>
>                 Key: FLINK-4576
>                 URL: https://issues.apache.org/jira/browse/FLINK-4576
>             Project: Flink
>          Issue Type: New Feature
>          Components: JobManager, Streaming, TaskManager
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Blocker
>             Fix For: 1.2.0
>
>
> As per discussion in FLINK-4341 by [~aljoscha] and [~StephanEwen], we need a 
> low watermark service in the JobManager to support transparent resharding / 
> partition discovery for our Kafka and Kinesis consumers (and any future 
> streaming connectors in general for which the external system may elastically 
> scale up and down independently of the parallelism of sources in Flink). The 
> main idea is to let source subtasks that don't emit their own watermarks 
> (because they currently don't have data partitions to consume) emit the low 
> watermark across all subtasks, instead of simply emitting a Long.MAX_VALUE 
> watermark and forbidding them to be assigned partitions in the future.
> The proposed implementation, from a high-level: a {{LowWatermarkCoordinator}} 
> will be added to execution graphs, periodically triggering only the source 
> vertices with a {{RetrieveLowWatermark}} message. The tasks reply to the 
> JobManager through the actor gateway (or a new interface after FLINK-4456 
> gets merged) with a {{ReplyLowWatermark}} message. When the coordinator 
> collects all low watermarks for a particular source vertex and determines the 
> aggregated low watermark for this round (accounting only values that are 
> larger than the aggregated low watermark of the last round), it sends a 
> {{NotifyNewLowWatermark}} message to the source vertex's tasks.
> The messages will only be relevant to tasks that implement an internal 
> {{LowWatermarkCooperatingTask}} interface. For now, only {{SourceStreamTask}} 
> should implement {{LowWatermarkCooperatingTask}}.
> Source functions should implement a public {{LowWatermarkListener}} interface 
> if they wish to get notified of the aggregated low watermarks across 
> subtasks. Connectors like the Kinesis consumer can choose to emit this 
> watermark if the subtask currently does not have any shards, so that 
> downstream operators may still properly advance time windows (implementation 
> for this is tracked as a separate issue).
> Overall, the service will include -
> New messages between JobManager <-> TaskManager:
> {{RetrieveLowWatermark(jobId, jobVertexId, taskId, timestamp)}}
> {{ReplyLowWatermark(jobId, jobVertexId, taskId, currentLowWatermark)}}
> {{NotifyNewLowWatermark(jobId, jobVertexId, taskId, newLowWatermark, 
> timestamp)}}
> New internal task interface {{LowWatermarkCooperatingTask}} in flink-runtime
> New public interface {{LowWatermarkListener}} in flink-streaming-java
> Might also need to extend {{SourceFunction.SourceContext}} to support 
> retrieving the current low watermark of sources.
> Any feedback for this is appreciated!



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to