[
https://issues.apache.org/jira/browse/FLINK-4855?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17323700#comment-17323700
]
Flink Jira Bot commented on FLINK-4855:
---------------------------------------
This issue is assigned but has not received an update in 7 days so it has been
labeled "stale-assigned". If you are still working on the issue, please give an
update and remove the label. If you are no longer working on the issue, please
unassign so someone else may work on it. In 7 days the issue will be
automatically unassigned.
> Add partitionedKeyBy to DataStream
> ----------------------------------
>
> Key: FLINK-4855
> URL: https://issues.apache.org/jira/browse/FLINK-4855
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Reporter: Xiaowei Jiang
> Assignee: Guowei Ma
> Priority: Major
> Labels: stale-assigned
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> After we do any interesting operations (e.g. reduce) on KeyedStream, the
> result becomes DataStream. In a lot of cases, the output still has the same
> or compatible keys with the KeyedStream (logically). But to do further
> operations on these keys, we are forced to use keyby again. This works
> semantically, but is costly in two aspects. First, it destroys the
> possibility of chaining, which is one of the most important optimization
> technique. Second, keyby will greatly expand the connected components of
> tasks, which has implications in failover optimization.
> To address this shortcoming, we propose a new operator partitionedKeyBy.
> DataStream {
> public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
> }
> Semantically, DataStream.partitionedKeyBy(key) is equivalent to
> DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid
> as an extra field. This guarantees that records from different tasks will
> never produce the same keys.
> With this, it's possible to do
> ds.keyBy(key1).reduce(func1)
> .partitionedKeyBy(key1).reduce(func2)
> .partitionedKeyBy(key2).reduce(func3);
> Most importantly, in certain cases, we will be able to chains these into a
> single vertex.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)