Dear devs,

I would like to start a discussion for FLIP-186: Refactor
DataStreamUtils#reinterpretAsKeyedStream[1].

As we know, this API has a limitation that the partitioning of the base
stream must be exactly the same as Flink's internal partitioning[2].

Currently, Flink would first select the key of a record through KeySelector
and then hashing it to get the corresponding KeyGroup.
                                               Record → Key → KeyGroup
If the base stream is already partitioned in the external system, there
exists a mapping as follows.
                                               Record → Key → Split
However, these two mappings are usually incompatible. An example is shown
in the FLIP[1]. Therefore, users have to partition their data stream again
after the Source to address the incompatibility, which would cause
unnecessary serde consumption and also KeyGroup computation.

In this FLIP, I would like to address this limitation. The best way would
be to map every split to a key group. And finally, the assignment of a
record to a key group could be built without any mismatch.
                                               Record → Key → Split →
KeyGroup

There are two options:

1. Require the users to provide the partitioner of the external system.
With this partitioner, the Flink could be aware of which split the record
belongs to. Therefore, the Flink just needs to build and maintain the
mapping from Split to KeyGroup. The downstream keyed operators could
utilize the partitioner of the external system and the mapping to get the
KeyGroup for a record. However, the partitioner of the external system is
usually not a public API. Therefore, it could be hard and meaningless for a
user to provide Flink such a partitioner.

2. The reason we need the partitioner of the external system is that we
need to get the mapping from a record to the partition, i.e., split. This
is actually obvious information for the Source. And the mapping from a
split to a KeyGroup could be built and maintain in the Source as well since
the parallelism is equal when Source Operators are chained with downstream
keyed operators. After that, all records from the same split will be
assigned to the same key group while records from different splits will be
assigned to different key groups. More specifically, previously, the key
group of a record is computed by hashing the key. It would now be assigned
by the Source and pass to downstream operators from the Source through
StreamRecord. Since this API can chain the upstream operators and
downstream operators together, the trade-off would be acceptable compare
with the consumption caused by the re-partitioning.

After comparing these two options, option 2 seems more reasonable and I
would like to take option 2 to address the limitation for the
"Pre-KeyedStream". And also, I was wondering that we could introduce this
API to DataStream formally after getting rid of the limitation. WDYT?

[1]
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184617636
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/

Best,
Senhong

Reply via email to