[
https://issues.apache.org/jira/browse/FLINK-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17061669#comment-17061669
]
Yuan Mei commented on FLINK-15670:
----------------------------------
Some updates: I have finished a new version of POC as promised (Prototype
version III). The new version can propagate watermarks through Kafka, a
`must-need` for a shuffle-like service.
*Prototype version III*
Link:
[https://github.com/apache/flink/compare/master...curcur:kafka_shuffle_III?expand=1]
There is a test case in `KafkaSimpleITCase.testKafkaWatermark()` as a running
example to explain how watermarks are handled and propagated in the pipeline.
{{// ExecutionGraph:}}
{{// Source0 -- WatermarkAssigner -- KafkaSink0}}
{{// Partition0 KafkaSource0}}
{{// Source1 -- WatermarkAssigner -- KafkaSink1}}
{{// Partition1 KafkaSource1}}
{{// Source2 -- WatermarkAssigner -- KafkaSink2}}
Input data is fixed and reproduceable.
*Sketched Implementation Details*
* On the producer side, `FlinkKafkaShuffleProducer` extends
`FlinkKafkaProducer` and overrides its `invoke` function to handle how data is
written to partitions.
* As mentioned in the previous comment, there is no API that I can use
directly to manipulate watermarks. Hence, this POC uses the context in
SinkFunction(KafkaShuffleProducer) to simulate accessing watermark. As a
result, we can not get the last watermark and the end_of_stream watermark
(watermark after the last record).
* On the consumer side, `AbstractFetcher.runFetchLoop` is overridden to emit
record and watermark.
*Conclusions*
The majority of uncertainties are addressed after this POC. Still, there are a
few things worth to think of and investigate:
# Access watermark directly instead of getting the watermark through the
context
# How to decide the end of a stream/partition (I think this can be solved if
item1 is resolved)
# Whether watermark should be part of the `partitionState`: do not think so,
at least in this version.
# We force 1:1 mapping of partition:consumer task in this version, hence we do
not need to handle cross partition watermark. Cross partition watermark should
not be hard to handle though.
A bit more details to follow up:
# A reasonable way `producer Parallelism` can be passed into Kafka Fetcher.
Kafka Fetcher needs this information to decide when to start emits a watermark.
# Unify cases with watermark & no watermarks
# Improve serialization & deserialization
# Check whether Failover/Checkpoint works as expected in this case.
# Map a Datastream -> KeyedDataStream
> Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's
> KeyGroups
> -------------------------------------------------------------------------------------
>
> Key: FLINK-15670
> URL: https://issues.apache.org/jira/browse/FLINK-15670
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream, Connectors / Kafka
> Reporter: Stephan Ewen
> Priority: Major
> Labels: usability
> Fix For: 1.11.0
>
>
> This Source/Sink pair would serve two purposes:
> 1. You can read topics that are already partitioned by key and process them
> without partitioning them again (avoid shuffles)
> 2. You can use this to shuffle through Kafka, thereby decomposing the job
> into smaller jobs and independent pipelined regions that fail over
> independently.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)