[ 
https://issues.apache.org/jira/browse/FLINK-17899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stephan Ewen resolved FLINK-17899.
----------------------------------
    Resolution: Fixed

Fixed in

1.11.0 via 
  - b3031accafc9ff1d6a3ee16e28f05d0df9c2da22
  - c0163d29d25555e4552b2b88d1ca0aabad46edbe
  - ae596d5338f8ff060f26641657239bacac2712b2
  - ac78993878826cc3bd982940b7a115797f07895c
  - 2e4a7fe3ed46a729420d12db3e714f594f84d03d

1.12.0 (master) via
  - 5070ee9075cda6cb4048636ee1a9f3b384166fc7
  - 4bec7fb642c9402ac2ee73354b66e60cfa92389b
  - 54f1a4c8071a6d71111185449e795b2f00fa49e9
  - 31d669e7c6d0cad88f31a3b6ca4fc5e7d178ddda
  - 4fdcbcea3e3968eec6b3b225458e2ce7b7ada1bd


> Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources
> -------------------------------------------------------------------
>
>                 Key: FLINK-17899
>                 URL: https://issues.apache.org/jira/browse/FLINK-17899
>             Project: Flink
>          Issue Type: Sub-task
>          Components: API / DataStream
>            Reporter: Stephan Ewen
>            Assignee: Stephan Ewen
>            Priority: Major
>              Labels: pull-request-available
>             Fix For: 1.11.0
>
>
> *Preambel:* This whole discussion is to some extend only necessary, because 
> in the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the 
> {{pollNext(...)}} method. However, this design follows some deeper runtime 
> pipeline design, and is not easy to change at this stage.
>  
> There are some principle design choices here:
>  
> *(1) Do we make Timestamps and Watermarks purely a feature of the library 
> (ConnectorBase), or do we integrate it with the core (SourceOperator).*
> Making it purely a responsibility of the ConnectorBase would have the 
> advantage of keeping the SourceOperator simple. However, there is value in 
> integrating this with the SourceOperator.
>  - Implementations that are not using the ConnectorBase (like simple 
> collection- or iterator-based sources) would automatically get access to the 
> plug-able TimestampExtractors and WatermarkGenerators.
>  - When executing batch programs, the SourceOperator can transparently inject 
> a "no-op" WatermarkGenerator so make sure no Watermarks are generated during 
> the batch execution. Given that batch sources are very performance sensitive, 
> it seems useful to not even run the watermark generator logic, rather than 
> later dropping the watermarks.
>  - In a future version, we may want to implement "global watermark holds" 
> generated my the Enumerators: The enumerator tells the readers how far they 
> may advance their local watermarks. This can help to not prematurely advance 
> the watermark based on a split's records when other splits have data 
> overlapping with older ranges. An example where this is commonly the case is 
> the streaming file source.
>  
> *(2) Is the per-partition watermarking purely a feature of the library 
> (ConnectorBase), or do we integrate it with the core (SourceOperator).*
> I believe we need to solve this on the same level as the previous question:
>  - Once a connector instantiates the per-partition watermark generators, the 
> main output (through which the SourceReader emits the records) must not run 
> its watermark generator any more. Otherwise we extract watermarks also on the 
> merged stream, which messes things up. So having the per-partition watermark 
> generators simply in the ConnectorBase and emit transparently through an 
> unchanged main output would not work.
>  - So, if we decide to implement watermarks support in the core 
> (SourceOperator), we would need to offer the per-partition watermarking 
> utilities on that level as well.
>  - Along a similar line of thoughts as in the previous point, the batch 
> execution can optimize the watermark extraction by supplying no-op extractors 
> also for the per-partition extractors (which will most likely bear the bulk 
> of the load in the connectors).
>  
> *(3) How would an integration of WatermarkGenerators with the SourceOperator 
> look like?*
> Rather straightforward, the SourceOperator instantiates a SourceOutput that 
> internally runs the timestamp extractor and watermark generator and emits to 
> the DataOutput that the operator emits to.
>  
> *(4) How would an integration of the per-split WatermarkGenerators look like?*
> I would propose to introduce a class {{ReaderMainOutput}} which extends 
> {{SourceOutput}} and. The {{SourceReader}} should accept a 
> {{ReaderMainOutput}} instead of a {{SourceOutput}}.
>  
> {code:java}
> public interface ReaderMainOutput<T> extends SourceOutput<T> {
>    @Override
>    void collect(T record);
>    @Override
>    void collect(T record, long timestamp);
>    SourceOutput<T> createOutputForSplit(String splitId);
>    void releaseOutputForSplit(String splitId);
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to