[
https://issues.apache.org/jira/browse/FLINK-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17056757#comment-17056757
]
Yuan Mei commented on FLINK-15670:
----------------------------------
Some updates since last sync up:
*Watermark*
For watermark, Stephan is proposing to keep each subtask's watermark in each
partition; and merge/align them on the consumer side. This is an improvement
version of the second solution proposed above. The benefit is to avoid the
exponential # of partitions.
+_*Producer Side*_+
*Stored Data Format*
{code:java}
TAG_RECORD, DATA<T>
TAG_WATERMARK, SUB_TASK_INDEX, WATERMARK(LONG)
{code}
The original idea is to store the `StreamElement` directly. However, there are
two problems:
# As stated above, the watermark needs to associate with a subtask index;
StreamElement does not include a subtask number.
# There are constraints on TYPE implication. For example, DataStreamSink
requires the inputStream and StreamSink to have the same type. This same type
is also the type of element the invokable sink function operates on. We need
many API changes if we want to break these constraints
Instead, I add a class `FlinkKafkaShuffleProducer` that extends
`FlinkKafkaProducer` and override its
`invoke` function. The function provides an internal Serializer that can handle
`Record` and `Watermark` at the same time.
*How a Watermark is propagated*
The producer(sink function) is only invoked when processing an element
(StreamRecord). So I am using the context associated with the element to get
the watermark. This is a bit tricky since a watermark is always put `behind` an
element, so I won't get a watermark if it is the end of the stream.
I was also thinking of invoking the producer (sink function) when
`processWatermark`. However, it is also a lot of API changes if doing this way.
I would try to avoid such interface changes if possible.
_*+Consumer Side+*_
*How data is consumed and a watermark is retrieved*
Currently, data is deserialized in KafkaFetcher.runFetchLoop; and a watermark
is regenerated in
AbstractFetcher.emitRecordWithTimestampAndPunctuatedWatermark. Hence, I am
going to override these two functions.
The producer side is almost done, and the consumer side is under construction.
> Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's
> KeyGroups
> -------------------------------------------------------------------------------------
>
> Key: FLINK-15670
> URL: https://issues.apache.org/jira/browse/FLINK-15670
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream, Connectors / Kafka
> Reporter: Stephan Ewen
> Priority: Major
> Labels: usability
> Fix For: 1.11.0
>
>
> This Source/Sink pair would serve two purposes:
> 1. You can read topics that are already partitioned by key and process them
> without partitioning them again (avoid shuffles)
> 2. You can use this to shuffle through Kafka, thereby decomposing the job
> into smaller jobs and independent pipelined regions that fail over
> independently.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)