I am not sure if I got everything right.

Let me first explain how a Reduce operator is executed in Flink at the
moment:

Assume a data set is randomly distributed across four machines and not
sorted.
When a Reduce function is applied on this data set, each of the four
machines run a Combine operator (the operator applies the Reduce function).
The combine operator will receive a certain amount of memory, say 256MB.
This memory will be filled with a subset of the data. The subset is sorted
in memory on the Reduce operator's grouping key and afterwards forwarded to
the Reduce function which pair-wise processes all elements with the same
key. For each unique grouping key in the memory buffer, there will be only
one result element. The memory is filled again, sorted, and combined until
all local data was combined. The all resulting elements are
hash-partitioned and sent to the corresponding node. Once this is done, all
elements with the same grouping key are on the same machine. The data is
now fully sorted (not only the data within a single memory buffer) and
again pair-wise reduced.
Your first picture seems to shows the local pair-wise reduce on sorted
data. If I understood it correctly, the second picture is on a different
level of abstraction, showing distributed execution of a tree-reduce.

For highly skewed data, it makes sense to add one or more combine step
before doing the final full reduce (I believe, this is was Kostas is
referring to).
The semantics of such a tree-combined reduce are the same as for the
regular reduce. It would just be executed in a different way. Therefore,
the function interface should remain the same and there is no need to add
an API operator for that (except maybe a hint to the optimizer to enforce a
tree-combine execution strategy).

Is this what you are looking for or do you need a new API operator with
different semantics?

Best, Fabian

2015-04-27 11:13 GMT+02:00 Kostas Tzoumas <ktzou...@apache.org>:

> Some form of tree aggregation is useful in many cases, and IMO a good
> addition to the system.
>
> Kostas
>
> On Mon, Apr 27, 2015 at 11:04 AM, Andra Lungu <lungu.an...@gmail.com>
> wrote:
>
> > Hi Fabian,
> >
> > After a quick look at the current behaviour of Flink's combinable
> reduce, I
> > saw that it does something like this:
> >
> >
> https://docs.google.com/drawings/d/1WfGJq1ZNQ-F0EQZ2TwEYS_861xc3fSdfJL9Z4VdXBQU/edit?usp=sharing
> >
> > It basically iterates over all the key groups two by two and if it finds
> > two keys within the same key group, it reduces them.
> >
> > What we would like to do(as Vasia said) as part of my thesis would be to
> > speed up the regular reduce by turning it into a treeReduce() where
> > appropriate[in GSA, for example, we do a reduce; that could easily be
> > hot-swapped with treeReduce() when skewed vertices are detected]. This
> new
> > operator will get the number of levels as an additional parameter(val1,
> > val2, numLevels) and will aggregate in levels:
> >
> >
> https://docs.google.com/drawings/d/1X_yJBdZykB9oBTbACUy9Bdd7oG5eDPDicNKhPcZ71ik/edit?usp=sharing
> >
> > The goal is to make computation for highly skewed graphs scale. If we map
> > one of the first nodes in the drawing to a vertex with high in-degree, it
> > will slow down computation with the first reduce approach. But I am sure
> we
> > could find many other use cases.
> >
> > Now, in order to write the treeReduce operator, I made some
> investigations.
> > It does not suffice to make reduce's run() method operate on levels, you
> > also need to ensure that the partial reduces in the levels are executed
> on
> > different machines. This is where the tricky and fun part begins. How do
> > you know which reduce is executed on which machine? In which class is
> this
> > described?
> >
> > I sure would hate to do duplicate work and since this is the first time I
> > had a look at Flink's internals, I could also use some  guidance.
> >
> >
> >
> >
> > On Mon, Apr 27, 2015 at 10:36 AM, Vasiliki Kalavri <
> > vasilikikala...@gmail.com> wrote:
> >
> > > Hi,
> > >
> > > Andra is working on a modified reduce operator, which would internally
> > > create an aggregation tree.
> > > This is work related to her thesis and we want to use it in graph
> > > computations for skewed inputs.
> > > It might or might not be a good idea to add it as a Flink operator and
> we
> > > will need to evaluate that (as part of the thesis), so we don't have a
> > JIRA
> > > for this :-)
> > >
> > > -Vasia.
> > >
> > > On 27 April 2015 at 10:20, Fabian Hueske <fhue...@gmail.com> wrote:
> > >
> > > > Hi Andra,
> > > >
> > > > is there a JIRA for the new runtime operator?
> > > >
> > > > Adding a new operator is a lot of work and touches many core parts of
> > the
> > > > system.
> > > > It would be good to start a discussion about that early in the
> process
> > to
> > > > make sure that the design is aligned with the system.
> > > > Otherwise, duplicated work might be necessary before it can be added
> to
> > > the
> > > > system.
> > > >
> > > > Cheers,
> > > > Fabian
> > > >
> > > > 2015-04-26 13:05 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:
> > > >
> > > > > Yes Markus,
> > > > >
> > > > > ds.reduce() -> AllReduceDriver
> > > > > ds.groupBy().reduce() -> ReduceDriver
> > > > >
> > > > > It's very intuitive ;)
> > > > >
> > > > > On Sun, Apr 26, 2015 at 12:34 PM, Markus Holzemer <
> > > > > holzemer.mar...@googlemail.com> wrote:
> > > > >
> > > > > > Hey Andrea,
> > > > > > perhaps you are looking at the wrong ReduceDriver?
> > > > > > As you can see in the DriverStrategy enum there is several
> > different
> > > > > > ReduceDrivers depending on the strategy the optimizer chooses.
> > > > > >
> > > > > > best,
> > > > > > Markus
> > > > > >
> > > > > > 2015-04-26 12:26 GMT+02:00 Andra Lungu <lungu.an...@gmail.com>:
> > > > > >
> > > > > > > Hey guys,
> > > > > > >
> > > > > > > I am trying to add a new runtime operator;
> > > > > > > To this end, I am following the guide here:
> > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
> http://ci.apache.org/projects/flink/flink-docs-master/internals/add_operator.html
> > > > > > > and the code itself.
> > > > > > >
> > > > > > >
> > > > > > > From what I understood, the run() in ReduceDriver, for
> instance,
> > > > should
> > > > > > be
> > > > > > > called every time a reduce() is called. However, I added a
> > > breakpoint
> > > > > in
> > > > > > > ReduceDriver's run method on the first if and called reduce()
> on
> > a
> > > > > > DataSet.
> > > > > > > When debugging, it seems that the method is not called; I also
> > > tried
> > > > > > adding
> > > > > > > a log.info() there. That doesn't get printed either...
> > Obviously,
> > > > the
> > > > > > same
> > > > > > > goes for System.out.println.
> > > > > > >
> > > > > > > Could someone explain the workflow a bit better? When exactly
> > does
> > > > > run()
> > > > > > > get called and what is ReduceDriver's role?
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Andra
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to