Hi Xiaowei,

I like the idea to reuse a partitioning and thus saving a shuffle
operation. It would be great if we could fail at runtime in case the
partitioning changed somehow. That way a logical user failure won't go
unnoticed.

Would it make sense to name the method partitionedByKey(...) because the
data is already partitioned?

Cheers,
Till

On Thu, Oct 20, 2016 at 9:53 AM, Xiaowei Jiang <xiaow...@gmail.com> wrote:

> After we do any interesting operations (e.g. reduce) on KeyedStream, the
> result becomes DataStream. In a lot of cases, the output still has the same
> or compatible keys with the KeyedStream (logically). But to do further
> operations on these keys, we are forced to use keyby again. This works
> semantically, but is costly in two aspects. First, it destroys the
> possibility of chaining, which is one of the most important optimization
> technique. Second, keyby will greatly expand the connected components of
> tasks, which has implications in failover optimization.
>
> To address this shortcoming, we propose a new operator partitionedKeyBy.
>
> DataStream {
>     public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
> }
>
> Semantically, DataStream.partitionedKeyBy(key) is equivalent to
> DataStream.keyBy(partitionedKey) where partitionedKey is key plus the
> taskid as an extra field. This guarantees that records from different tasks
> will never produce the same keys.
>
> With this, it's possible to do
>
> ds.keyBy(key1).reduce(func1)
>     .partitionedKeyBy(key1).reduce(func2)
>     .partitionedKeyBy(key2).reduce(func3);
>
> Most importantly, in certain cases, we will be able to chains these into a
> single vertex.
>
> Please share your thoughts. The JIRA is at https://issues.apache.org/j
> ira/browse/FLINK-4855
>
> Xiaowei
>

Reply via email to