[
https://issues.apache.org/jira/browse/FLINK-4855?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Flink Jira Bot updated FLINK-4855:
----------------------------------
Labels: auto-deprioritized-major auto-unassigned stale-minor (was:
auto-deprioritized-major auto-unassigned)
I am the [Flink Jira Bot|https://github.com/apache/flink-jira-bot/] and I help
the community manage its development. I see this issues has been marked as
Minor but is unassigned and neither itself nor its Sub-Tasks have been updated
for 180 days. I have gone ahead and marked it "stale-minor". If this ticket is
still Minor, please either assign yourself or give an update. Afterwards,
please remove the label or in 7 days the issue will be deprioritized.
> Add partitionedKeyBy to DataStream
> ----------------------------------
>
> Key: FLINK-4855
> URL: https://issues.apache.org/jira/browse/FLINK-4855
> Project: Flink
> Issue Type: Improvement
> Components: API / DataStream
> Reporter: Xiaowei Jiang
> Priority: Minor
> Labels: auto-deprioritized-major, auto-unassigned, stale-minor
> Original Estimate: 168h
> Remaining Estimate: 168h
>
> After we do any interesting operations (e.g. reduce) on KeyedStream, the
> result becomes DataStream. In a lot of cases, the output still has the same
> or compatible keys with the KeyedStream (logically). But to do further
> operations on these keys, we are forced to use keyby again. This works
> semantically, but is costly in two aspects. First, it destroys the
> possibility of chaining, which is one of the most important optimization
> technique. Second, keyby will greatly expand the connected components of
> tasks, which has implications in failover optimization.
> To address this shortcoming, we propose a new operator partitionedKeyBy.
> DataStream {
> public <K> KeyedStream<T, K> partitionedKeyBy(KeySelector<T, K> key)
> }
> Semantically, DataStream.partitionedKeyBy(key) is equivalent to
> DataStream.keyBy(partitionedKey) where partitionedKey is key plus the taskid
> as an extra field. This guarantees that records from different tasks will
> never produce the same keys.
> With this, it's possible to do
> ds.keyBy(key1).reduce(func1)
> .partitionedKeyBy(key1).reduce(func2)
> .partitionedKeyBy(key2).reduce(func3);
> Most importantly, in certain cases, we will be able to chains these into a
> single vertex.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)