[
https://issues.apache.org/jira/browse/FLINK-15670?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17093323#comment-17093323
]
Yuan Mei commented on FLINK-15670:
----------------------------------
Thanks [~AHeise] for reviewing the code! Appreciate :)
We've finished the first round of review. I am summerizing here the current
status: we've reached agreement except the API change. That does not say we
disagree on the API change, but we can not make the final decision since it is
related to public API changes. The review is around four questions:
h4. Have not decided yet:
h4. 1. API change – changes upon {{SinkFunction}}
* As suggested by Stephan, we have a custom operator {{StreamShuffleSink}} and
a custom transformation in {{SinkTransformation}} for the custom operator.
StreamShuffleSink is very similar to `StreamSink` excepts how it handles
watermark: it invokes a userFunction to handle `watermark`.
* {{SinkFunction}} as its name, is the function invoked in the sink operator
to provide a invoke function to handle record. {{FlinkKafkaProducer}} itself is
a TwoPhaseCommitSinkFunction which implements {{SinkFunction}}.
* Current change to SinkFunction is "adding an invoke method to handle
watermark" with default implementation unsupported (this eliminates the effect
to existing functions implementing SinkFunction).
* If we really want to avoid changing {{SinkFunction}}, I can have a new
interface and have the current TwoPhaseCommitSinkFunction implements the new
interface. It should be safer than the current way (but a bit weird TBH).
TwoPhaseCommitSinkFunction is only used by KafkaProducer for now, but may be
used by others as well.
* The bottom line is avoid duplicating the entire Kafka producer function.
--------------------------------------------------------
h4. Agreed
h4. 2. {{StreamElementSerializer}}
{{StreamElementSerializer}} is not reusable because the way watermark is
stored/handled is different. In short, if multiple sink subtasks write to the
same partition (sink), we need a way to decide the watermark in the source
(downstream operator from the shuffle perspective).
In the current netty shuffle service, we keep N channels and watermarks in each
channel; while in this case, data and watermarks have been merged when writing
to partitions.
Discussion related to watermark is starting form here. It includes my original
thoughts, proposals and Stephan's enhanced version.
h4. 3. "Checkpoints" and "Savepoints"
Savepoints are very similar to checkpoints except that savepoints are more or
less user-faced. That says user can trigger a replay based on save points. I
guess I can kind of understanding why "restoring from an old savepoint would
completely screw up the data". It is true if you think of this problem from a
global snapshotting and global failover perspective.
However, let's step back and think of why we we want to have the persistent
shuffle in the first place. If data is persisted, we do not really need to
replay the calculation again. Persistency is to unleash the constraints between
upstream and downstream.
The more focus is how to do the replay based on save points in practise,
especially say when wrong data is produced. For the very first version, we can
leave the cluster maintaince and data management to users and document well.
h4. 4. Why we need {{KafkaShuffleProducer}} and {{KafkaShuffleFetcher}}? Why do
not just reuse the current {{KafkaProducer}} and {{KafkaConumser}}?
Actually the previous two versions of POCs are using the existed kafka producer
and consumer. There are several reasons I've decided to have a different one.
* Watermark write/read; As you can see, there are extra logic of handling
watermark in both read and write side; and I do not want to interfere with
existing Kafka reads/writes
* The current KafkaProducer has two ways to use
through KeyedSerializationSchema (depreciated) and
through (KafkaSerializationSchema).
If we want to use a customPartitioner, we have to use KeyedSerializationSchema
(bundled)
* KeyedSerializationSchema following the Kafka key-value schema (key write to
the key, and value write to the value); However, KafkaShuffle writes both key
and value to Kafka's value part.
> Provide a Kafka Source/Sink pair that aligns Kafka's Partitions and Flink's
> KeyGroups
> -------------------------------------------------------------------------------------
>
> Key: FLINK-15670
> URL: https://issues.apache.org/jira/browse/FLINK-15670
> Project: Flink
> Issue Type: New Feature
> Components: API / DataStream, Connectors / Kafka
> Reporter: Stephan Ewen
> Priority: Major
> Labels: pull-request-available, usability
> Fix For: 1.11.0
>
> Time Spent: 10m
> Remaining Estimate: 0h
>
> This Source/Sink pair would serve two purposes:
> 1. You can read topics that are already partitioned by key and process them
> without partitioning them again (avoid shuffles)
> 2. You can use this to shuffle through Kafka, thereby decomposing the job
> into smaller jobs and independent pipelined regions that fail over
> independently.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)