Hi! Flink has a non-batch exchange way to break pipelines, which is by now quite custom for iterations. It is used there for constructs that fork and re-join the flow.
The proper batch-exchange is better, because the scheduler can exploit that, but is is not yet usable in iterations. Stephan On Mon, Feb 1, 2016 at 1:04 PM, Fridtjof Sander < fsan...@mailbox.tu-berlin.de> wrote: > Hi Fabian, > > thanks for your explanation! > > Yeah, I figured that if an easy fix exists, you would have done that > yourself. This is more for me to understand the conceptual problem. > > But back to the pipeline-requirement: Doesn't zipWithIndex violate that > too, then? It's also a mapPartitions, collect + broadcast, plus another > mapPartitions. This should roughly be the same procedure as building a > histogram and propagate partition boundaries, right?. Not much going on > there with pipelining. However, I hadn't problems with zipWithIndex inside > iterations. > > Also, is there a difference between the "materialization" you mentioned > and the execution of a datasink operator? > > Again, if all that is written somewhere, just throw me the link, I don't > want to waste your time. > > Best > Fridtjof > > Am 1. Februar 2016 12:21:21 MEZ, schrieb Fabian Hueske <fhue...@gmail.com > >: > >Hi Fridtjof, > > > >the range partitioner works by building a histogram for the > >partitioning > >key. This requires a pass over the whole intermediate data set which > >means > >it needs to be materialized and cannot be processed in a pipelined > >fashion. > >However, pipelined data exchange strategies are a requirement for the > >data > >flows which are executed for iteration bodies. > > > >This is nothing that can be easily fixed at the moment. Touching this > >part > >of the runtime code would have major implications. > >I afraid, but I believe we have to accept this restriction. > > > >Best, Fabian > > > > > >2016-02-01 11:47 GMT+01:00 Fridtjof Sander > ><fsan...@mailbox.tu-berlin.de>: > > > >> Dear Flink-Devs, > >> > >> I recently ran into a problem where range-partitioning within > >iterations > >> would be useful. > >> > >> In the PR for range-partitioning it is said, this doesn't work > >because of > >> some batched data-exchange mode. > >> https://github.com/apache/flink/pull/1255 > >> > >> I would like to understand the issue with that, but could not find > >> articles/blog posts/etc to read about that. > >> > >> Do you have some pointers for me? Code will also work if the concept > >gets > >> clear from it. > >> > >> Thanks for your time! > >> > >> Best, Fridtjof > >> >