[ 
https://issues.apache.org/jira/browse/FLINK-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061669#comment-17061669
 ] 

Yuan Mei commented on FLINK-15670:
----------------------------------

 

Some updates: I have finished a new version of POC as promised (Prototype 
version III). The new version can propagate watermarks through Kafka, a 
`must-need` for a shuffle-like service.

*Prototype version III* 

Link: 
[https://github.com/apache/flink/compare/master...curcur:kafka_shuffle_III?expand=1]

There is a test case in `KafkaSimpleITCase.testKafkaWatermark()` as a running 
example to explain how watermarks are handled and propagated in the pipeline. 

{{// ExecutionGraph:}}
{{// Source0 -- WatermarkAssigner -- KafkaSink0}}
{{//                                               Partition0   KafkaSource0}}
{{// Source1 -- WatermarkAssigner -- KafkaSink1}}
{{//                                               Partition1   KafkaSource1}}
{{// Source2 -- WatermarkAssigner -- KafkaSink2}}

Input data is fixed and reproduceable.

*Sketched Implementation Details*
 * On the producer side, `FlinkKafkaShuffleProducer` extends 
`FlinkKafkaProducer` and overrides its `invoke` function to handle how data is 
written to partitions.
 * As mentioned in the previous comment, there is no API that I can use 
directly to manipulate watermarks. Hence,  this POC uses the context in 
SinkFunction(KafkaShuffleProducer) to simulate accessing watermark. As a 
result, we can not get the last watermark and the end_of_stream watermark 
(watermark after the last record).
 * On the consumer side, `AbstractFetcher.runFetchLoop` is overridden to emit 
record and watermark.

*Conclusions*

The majority of uncertainties are addressed after this POC. Still, there are a 
few things worth to think of and investigate:
 # Access watermark directly instead of getting the watermark through the 
context
 # How to decide the end of a stream/partition (I think this can be solved if 
item1 is resolved)
 # Whether watermark should be part of the `partitionState`: do not think so, 
at least in this version.
 # We force 1:1 mapping of partition:consumer task in this version, hence we do 
not need to handle cross partition watermark. Cross partition watermark should 
not be hard to handle though.

A bit more details to follow up:
 # A reasonable way `producer Parallelism` can be passed into Kafka Fetcher. 
Kafka Fetcher needs this information to decide when to start emits a watermark.
 # Unify cases with watermark & no watermarks
 # Improve serialization & deserialization
 # Check whether Failover/Checkpoint works as expected in this case.
 # Map a Datastream -> KeyedDataStream

 

> Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's 
> KeyGroups
> -------------------------------------------------------------------------------------
>
>                 Key: FLINK-15670
>                 URL: https://issues.apache.org/jira/browse/FLINK-15670
>             Project: Flink
>          Issue Type: New Feature
>          Components: API / DataStream, Connectors / Kafka
>            Reporter: Stephan Ewen
>            Priority: Major
>              Labels: usability
>             Fix For: 1.11.0
>
>
> This Source/Sink pair would serve two purposes:
> 1. You can read topics that are already partitioned by key and process them 
> without partitioning them again (avoid shuffles)
> 2. You can use this to shuffle through Kafka, thereby decomposing the job 
> into smaller jobs and independent pipelined regions that fail over 
> independently.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to