Hey,

This question is mainly targeted towards Aljoscha but maybe someone can
help me out here:

I think the way feedback partitioning is handled does not work, let me
illustrate with a simple example:

IterativeStream it = ... (parallelism 1)
DataStream mapped = it.map(...) (parallelism 2)
// this does not work as the feedback has parallelism 2 != 1
// it.closeWith(mapped.partitionByHash(someField))
// so we need rebalance the data
it.closeWith(mapped.map(NoOpMap).setParallelism(1).partitionByHash(someField))

This program will execute but the feedback will not be partitioned by hash
to the mapper instances:
The partitioning will be set from the noOpMap to the iteration sink which
has parallelism different from the mapper (1 vs 2) and then the iteration
source forwards the element to the mapper (always to 0).

So the problem is basically that the iteration source/sink pair gets the
parallelism of the input stream (p=1) not the head operator (p = 2) which
leads to incorrect partitioning.

Did I miss something here?

Cheers,
Gyula

Reply via email to