Gyula Fora created FLINK-2824:
---------------------------------
Summary: Iteration feedback partitioning does not work as expected
Key: FLINK-2824
URL: https://issues.apache.org/jira/browse/FLINK-2824
Project: Flink
Issue Type: Bug
Components: Streaming
Reporter: Gyula Fora
Priority: Blocker
Iteration feedback partitioning is not handled transparently and can cause
serious issues if the user does not know the specific implementation details of
streaming iterations (which is not a realistic expectation).
Example:
IterativeStream it = ... (parallelism 1)
DataStream mapped = it.map(...) (parallelism 2)
// this does not work as the feedback has parallelism 2 != 1
// it.closeWith(mapped.partitionByHash(someField))
// so we need rebalance the data
it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))
This program will execute but the feedback will not be partitioned by hash to
the mapper instances:
The partitioning will be set from the noOpMap to the iteration sink which has
parallelism different from the mapper (1 vs 2) and then the iteration source
forwards the element to the mapper (always to 0).
So the problem is basically that the iteration source/sink pair gets the
parallelism of the input stream (p=1) not the head operator (p = 2) which leads
to incorrect partitioning.
Workaround:
Set input parallelism to the same as the head operator
Suggested solution:
The iteration construction should be reworked to set the parallelism of the
source/sink to the parallelism of the head operator (and validate that all
heads have the same parallelism)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)