[ 
https://issues.apache.org/jira/browse/FLINK-17899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen closed FLINK-17899.
--------------------------------

> Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources
> -------------------------------------------------------------------
>
>                 Key: FLINK-17899
>                 URL: https://issues.apache.org/jira/browse/FLINK-17899
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> *Preambel:* This whole discussion is to some extend only necessary, because 
> in the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the 
> {{pollNext(...)}} method. However, this design follows some deeper runtime 
> pipeline design, and is not easy to change at this stage.
>  
> There are some principle design choices here:
>  
> *(1) Do we make Timestamps and Watermarks purely a feature of the library 
> (ConnectorBase), or do we integrate it with the core (SourceOperator).*
> Making it purely a responsibility of the ConnectorBase would have the 
> advantage of keeping the SourceOperator simple. However, there is value in 
> integrating this with the SourceOperator.
>  - Implementations that are not using the ConnectorBase (like simple 
> collection- or iterator-based sources) would automatically get access to the 
> plug-able TimestampExtractors and WatermarkGenerators.
>  - When executing batch programs, the SourceOperator can transparently inject 
> a "no-op" WatermarkGenerator so make sure no Watermarks are generated during 
> the batch execution. Given that batch sources are very performance sensitive, 
> it seems useful to not even run the watermark generator logic, rather than 
> later dropping the watermarks.
>  - In a future version, we may want to implement "global watermark holds" 
> generated my the Enumerators: The enumerator tells the readers how far they 
> may advance their local watermarks. This can help to not prematurely advance 
> the watermark based on a split's records when other splits have data 
> overlapping with older ranges. An example where this is commonly the case is 
> the streaming file source.
>  
> *(2) Is the per-partition watermarking purely a feature of the library 
> (ConnectorBase), or do we integrate it with the core (SourceOperator).*
> I believe we need to solve this on the same level as the previous question:
>  - Once a connector instantiates the per-partition watermark generators, the 
> main output (through which the SourceReader emits the records) must not run 
> its watermark generator any more. Otherwise we extract watermarks also on the 
> merged stream, which messes things up. So having the per-partition watermark 
> generators simply in the ConnectorBase and emit transparently through an 
> unchanged main output would not work.
>  - So, if we decide to implement watermarks support in the core 
> (SourceOperator), we would need to offer the per-partition watermarking 
> utilities on that level as well.
>  - Along a similar line of thoughts as in the previous point, the batch 
> execution can optimize the watermark extraction by supplying no-op extractors 
> also for the per-partition extractors (which will most likely bear the bulk 
> of the load in the connectors).
>  
> *(3) How would an integration of WatermarkGenerators with the SourceOperator 
> look like?*
> Rather straightforward, the SourceOperator instantiates a SourceOutput that 
> internally runs the timestamp extractor and watermark generator and emits to 
> the DataOutput that the operator emits to.
>  
> *(4) How would an integration of the per-split WatermarkGenerators look like?*
> I would propose to introduce a class {{ReaderMainOutput}} which extends 
> {{SourceOutput}} and. The {{SourceReader}} should accept a 
> {{ReaderMainOutput}} instead of a {{SourceOutput}}.
>  
> {code:java}
> public interface ReaderMainOutput<T> extends SourceOutput<T> {
>    @Override
>    void collect(T record);
>    @Override
>    void collect(T record, long timestamp);
>    SourceOutput<T> createOutputForSplit(String splitId);
>    void releaseOutputForSplit(String splitId);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to