Dear devs, I would like to start a discussion for FLIP-186: Refactor DataStreamUtils#reinterpretAsKeyedStream[1].
As we know, this API has a limitation that the partitioning of the base stream must be exactly the same as Flink's internal partitioning[2]. Currently, Flink would first select the key of a record through KeySelector and then hashing it to get the corresponding KeyGroup. Record → Key → KeyGroup If the base stream is already partitioned in the external system, there exists a mapping as follows. Record → Key → Split However, these two mappings are usually incompatible. An example is shown in the FLIP[1]. Therefore, users have to partition their data stream again after the Source to address the incompatibility, which would cause unnecessary serde consumption and also KeyGroup computation. In this FLIP, I would like to address this limitation. The best way would be to map every split to a key group. And finally, the assignment of a record to a key group could be built without any mismatch. Record → Key → Split → KeyGroup There are two options: 1. Require the users to provide the partitioner of the external system. With this partitioner, the Flink could be aware of which split the record belongs to. Therefore, the Flink just needs to build and maintain the mapping from Split to KeyGroup. The downstream keyed operators could utilize the partitioner of the external system and the mapping to get the KeyGroup for a record. However, the partitioner of the external system is usually not a public API. Therefore, it could be hard and meaningless for a user to provide Flink such a partitioner. 2. The reason we need the partitioner of the external system is that we need to get the mapping from a record to the partition, i.e., split. This is actually obvious information for the Source. And the mapping from a split to a KeyGroup could be built and maintain in the Source as well since the parallelism is equal when Source Operators are chained with downstream keyed operators. After that, all records from the same split will be assigned to the same key group while records from different splits will be assigned to different key groups. More specifically, previously, the key group of a record is computed by hashing the key. It would now be assigned by the Source and pass to downstream operators from the Source through StreamRecord. Since this API can chain the upstream operators and downstream operators together, the trade-off would be acceptable compare with the consumption caused by the re-partitioning. After comparing these two options, option 2 seems more reasonable and I would like to take option 2 to address the limitation for the "Pre-KeyedStream". And also, I was wondering that we could introduce this API to DataStream formally after getting rid of the limitation. WDYT? [1] https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=184617636 [2] https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/experimental/ Best, Senhong