After we do any interesting operations (e.g. reduce) on KeyedStream, the result becomes DataStream. In a lot of cases, the output still has the same or compatible keys with the KeyedStream (logically). But to do further operations on these keys, we are forced to use keyby again. This works semantically, but is costly in two aspects. First, it destroys the possibility of chaining, which is one of the most important optimization technique. Second, keyby will greatly expand the connected components of tasks, which has implications in failover optimization.
To address this shortcoming, we propose a new operator partitionedKeyBy. DataStream { public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key) } Semantically, DataStream.partitionedKeyBy(key) is equivalent to DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid as an extra field. This guarantees that records from different tasks will never produce the same keys. With this, it's possible to do ds.keyBy(key1).reduce(func1) .partitionedKeyBy(key1).reduce(func2) .partitionedKeyBy(key2).reduce(func3); Most importantly, in certain cases, we will be able to chains these into a single vertex. Please share your thoughts. The JIRA is at https://issues.apache.org/j ira/browse/FLINK-4855 Xiaowei