[
https://issues.apache.org/jira/browse/FLINK-17899?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stephan Ewen resolved FLINK-17899.
----------------------------------
Resolution: Fixed
Fixed in
1.11.0 via
- b3031accafc9ff1d6a3ee16e28f05d0df9c2da22
- c0163d29d25555e4552b2b88d1ca0aabad46edbe
- ae596d5338f8ff060f26641657239bacac2712b2
- ac78993878826cc3bd982940b7a115797f07895c
- 2e4a7fe3ed46a729420d12db3e714f594f84d03d
1.12.0 (master) via
- 5070ee9075cda6cb4048636ee1a9f3b384166fc7
- 4bec7fb642c9402ac2ee73354b66e60cfa92389b
- 54f1a4c8071a6d71111185449e795b2f00fa49e9
- 31d669e7c6d0cad88f31a3b6ca4fc5e7d178ddda
- 4fdcbcea3e3968eec6b3b225458e2ce7b7ada1bd
> Integrate FLIP-126 Timestamps and Watermarking with FLIP-27 sources
> -------------------------------------------------------------------
>
> Key: FLINK-17899
> URL: https://issues.apache.org/jira/browse/FLINK-17899
> Project: Flink
> Issue Type: Sub-task
> Components: API / DataStream
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Major
> Labels: pull-request-available
> Fix For: 1.11.0
>
>
> *Preambel:* This whole discussion is to some extend only necessary, because
> in the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the
> {{pollNext(...)}} method. However, this design follows some deeper runtime
> pipeline design, and is not easy to change at this stage.
>
> There are some principle design choices here:
>
> *(1) Do we make Timestamps and Watermarks purely a feature of the library
> (ConnectorBase), or do we integrate it with the core (SourceOperator).*
> Making it purely a responsibility of the ConnectorBase would have the
> advantage of keeping the SourceOperator simple. However, there is value in
> integrating this with the SourceOperator.
> - Implementations that are not using the ConnectorBase (like simple
> collection- or iterator-based sources) would automatically get access to the
> plug-able TimestampExtractors and WatermarkGenerators.
> - When executing batch programs, the SourceOperator can transparently inject
> a "no-op" WatermarkGenerator so make sure no Watermarks are generated during
> the batch execution. Given that batch sources are very performance sensitive,
> it seems useful to not even run the watermark generator logic, rather than
> later dropping the watermarks.
> - In a future version, we may want to implement "global watermark holds"
> generated my the Enumerators: The enumerator tells the readers how far they
> may advance their local watermarks. This can help to not prematurely advance
> the watermark based on a split's records when other splits have data
> overlapping with older ranges. An example where this is commonly the case is
> the streaming file source.
>
> *(2) Is the per-partition watermarking purely a feature of the library
> (ConnectorBase), or do we integrate it with the core (SourceOperator).*
> I believe we need to solve this on the same level as the previous question:
> - Once a connector instantiates the per-partition watermark generators, the
> main output (through which the SourceReader emits the records) must not run
> its watermark generator any more. Otherwise we extract watermarks also on the
> merged stream, which messes things up. So having the per-partition watermark
> generators simply in the ConnectorBase and emit transparently through an
> unchanged main output would not work.
> - So, if we decide to implement watermarks support in the core
> (SourceOperator), we would need to offer the per-partition watermarking
> utilities on that level as well.
> - Along a similar line of thoughts as in the previous point, the batch
> execution can optimize the watermark extraction by supplying no-op extractors
> also for the per-partition extractors (which will most likely bear the bulk
> of the load in the connectors).
>
> *(3) How would an integration of WatermarkGenerators with the SourceOperator
> look like?*
> Rather straightforward, the SourceOperator instantiates a SourceOutput that
> internally runs the timestamp extractor and watermark generator and emits to
> the DataOutput that the operator emits to.
>
> *(4) How would an integration of the per-split WatermarkGenerators look like?*
> I would propose to introduce a class {{ReaderMainOutput}} which extends
> {{SourceOutput}} and. The {{SourceReader}} should accept a
> {{ReaderMainOutput}} instead of a {{SourceOutput}}.
>
> {code:java}
> public interface ReaderMainOutput<T> extends SourceOutput<T> {
> @Override
> void collect(T record);
> @Override
> void collect(T record, long timestamp);
> SourceOutput<T> createOutputForSplit(String splitId);
> void releaseOutputForSplit(String splitId);
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)