Stephan Ewen created FLINK-17899:
------------------------------------

             Summary: Integrate FLIP-126 Timestamps and Watermarking with 
FLIP-27 sources
                 Key: FLINK-17899
                 URL: https://issues.apache.org/jira/browse/FLINK-17899
             Project: Flink
          Issue Type: Sub-task
          Components: API / DataStream
            Reporter: Stephan Ewen
            Assignee: Stephan Ewen


*Preambel:* This whole discussion is to some extend only necessary, because in 
the {{SourceReader}}, we pass the {{SourceOutput}} as a parameter to the 
{{pollNext(...)}} method. However, this design follows some deeper runtime 
pipeline design, and is not easy to change at this stage.

 

There are some principle design choices here:

 

*(1) Do we make Timestamps and Watermarks purely a feature of the library 
(ConnectorBase), or do we integrate it with the core (SourceOperator).*

Making it purely a responsibility of the ConnectorBase would have the advantage 
of keeping the SourceOperator simple. However, there is value in integrating 
this with the SourceOperator.
 - Implementations that are not using the ConnectorBase (like simple 
collection- or iterator-based sources) would automatically get access to the 
plug-able TimestampExtractors and WatermarkGenerators.

 - When executing batch programs, the SourceOperator can transparently inject a 
"no-op" WatermarkGenerator so make sure no Watermarks are generated during the 
batch execution. Given that batch sources are very performance sensitive, it 
seems useful to not even run the watermark generator logic, rather than later 
dropping the watermarks.

 - In a future version, we may want to implement "global watermark holds" 
generated my the Enumerators: The enumerator tells the readers how far they may 
advance their local watermarks. This can help to not prematurely advance the 
watermark based on a split's records when other splits have data overlapping 
with older ranges. An example where this is commonly the case is the streaming 
file source.

 

*(2) Is the per-partition watermarking purely a feature of the library 
(ConnectorBase), or do we integrate it with the core (SourceOperator).*

I believe we need to solve this on the same level as the previous question:
 - Once a connector instantiates the per-partition watermark generators, the 
main output (through which the SourceReader emits the records) must not run its 
watermark generator any more. Otherwise we extract watermarks also on the 
merged stream, which messes things up. So having the per-partition watermark 
generators simply in the ConnectorBase and emit transparently through an 
unchanged main output would not work.

 - So, if we decide to implement watermarks support in the core 
(SourceOperator), we would need to offer the per-partition watermarking 
utilities on that level as well.

 - Along a similar line of thoughts as in the previous point, the batch 
execution can optimize the watermark extraction by supplying no-op extractors 
also for the per-partition extractors (which will most likely bear the bulk of 
the load in the connectors).

 

*(3) How would an integration of WatermarkGenerators with the SourceOperator 
look like?*

Rather straightforward, the SourceOperator instantiates a SourceOutput that 
internally runs the timestamp extractor and watermark generator and emits to 
the DataOutput that the operator emits to.

 

*(4) How would an integration of the per-split WatermarkGenerators look like?*

I would propose to add a method to the {{SourceReaderContext}}: 
{{SplitAwareOutputs createSourceAwareOutputs()}}

The {{SplitAwareOutputs}} looks the following way:
{code:java}
public interface SplitAwareOutputs<T> {

    SourceOutput<T> createOutputForSplit(String splitId);

    void releaseOutputForSplit(String splitId);
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to