Hey, This question is mainly targeted towards Aljoscha but maybe someone can help me out here:
I think the way feedback partitioning is handled does not work, let me illustrate with a simple example: IterativeStream it = ... (parallelism 1) DataStream mapped = it.map(...) (parallelism 2) // this does not work as the feedback has parallelism 2 != 1 // it.closeWith(mapped.partitionByHash(someField)) // so we need rebalance the data it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField)) This program will execute but the feedback will not be partitioned by hash to the mapper instances: The partitioning will be set from the noOpMap to the iteration sink which has parallelism different from the mapper (1 vs 2) and then the iteration source forwards the element to the mapper (always to 0). So the problem is basically that the iteration source/sink pair gets the parallelism of the input stream (p=1) not the head operator (p = 2) which leads to incorrect partitioning. Did I miss something here? Cheers, Gyula