Hi everybody,

I found an issue with my first approach therefore I couldn't run the
experiments yet. In the design document I summarized my ideas and work of
the last weeks on this issue.

You can find the design document here:
https://docs.google.com/document/d/1odYIvmQt4feonQF9q-btBnGvrzzN3lX0Os6rzHcCOjA/edit?usp=sharing

I highly appreciate any idea or comment and I am looking forward to the
discussion to finally solve this issue :)

Best regards,
Felix

2016-07-08 1:47 GMT+02:00 Felix Neutatz <neut...@googlemail.com>:

> Hi,
>
> i already started to work on this issue. Therefore I created a Jira:
> https://issues.apache.org/jira/browse/FLINK-4175
> I have already implemented a quick version which could solve it. I will
> run the experiments on the cluster first and will describe my approach on
> Monday :)
>
> Have a nice weekend,
> Felix
>
> P.S. for super curious people:
> https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7
>
> 2016-06-09 11:50 GMT+02:00 Felix Neutatz <neut...@googlemail.com>:
>
>> Hi everybody,
>>
>> could we use the org.apache.flink.api.common.cache.DistributedCache to
>> work around this Broadcast issue for the moment, until we fixed it?
>> Or do you think it won't scale either?
>>
>> Best regards,
>> Felix
>>
>> 2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>:
>>
>>> Till is right. Broadcast joins currently materialize once per slot.
>>> Originally, the purely push based runtime was not good enough to handle
>>> it
>>> differently.
>>>
>>> By now, we could definitely handle BC Vars differently (only one slot per
>>> TM requests).
>>> For BC Joins, the hash tables do not coordinate spilling currently, which
>>> means that we cannot do multiple joins through the same hash table.
>>>
>>>
>>> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <trohrm...@apache.org>
>>> wrote:
>>>
>>> > If I'm not mistaken, then broadcast variables and broadcast inputs of
>>> joins
>>> > follow different code paths. Broadcast variables use additional input
>>> > channels and are read before the actual driver code runs. In contrast
>>> to
>>> > that, a join operation is a two input operator where the join driver
>>> > decides how to handle the inputs (which one to read first as build
>>> input).
>>> >
>>> > This also entails that the broadcast variable optimization, where each
>>> task
>>> > manager holds the data only once and copies of the data are discarded
>>> (but
>>> > they are transmitted multiple times to the TM), does not apply to the
>>> > broadcast join inputs. Here you should see an slightly worse
>>> performance
>>> > degradation with your initial benchmark if you increase the number of
>>> > slots.
>>> >
>>> > Cheers,
>>> > Till
>>> >
>>> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov <
>>> > alexander.s.alexand...@gmail.com> wrote:
>>> >
>>> > > > As far as I know, the reason why the broadcast variables are
>>> > implemented
>>> > > that way is that the senders would have to know which sub-tasks are
>>> > > deployed to which TMs.
>>> > >
>>> > > As the broadcast variables are realized as additionally attached
>>> > "broadcast
>>> > > channels", I am assuming that the same behavior will apply for
>>> broadcast
>>> > > joins as well.
>>> > >
>>> > > Is this the case?
>>> > >
>>> > > Regards,
>>> > > Alexander
>>> > >
>>> > >
>>> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas <
>>> andreas.ku...@tu-berlin.de>:
>>> > >
>>> > > > Hi Till,
>>> > > >
>>> > > > thanks for the fast answer.
>>> > > > I'll think about a concrete way of implementing and open an JIRA.
>>> > > >
>>> > > > Best
>>> > > > Andreas
>>> > > > ________________________________________
>>> > > > Von: Till Rohrmann <trohrm...@apache.org>
>>> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53
>>> > > > An: dev@flink.apache.org
>>> > > > Betreff: Re: Broadcast data sent increases with # slots per TM
>>> > > >
>>> > > > Hi Andreas,
>>> > > >
>>> > > > your observation is correct. The data is sent to each slot and the
>>> > > > receiving TM only materializes one copy of the data. The rest of
>>> the
>>> > data
>>> > > > is discarded.
>>> > > >
>>> > > > As far as I know, the reason why the broadcast variables are
>>> > implemented
>>> > > > that way is that the senders would have to know which sub-tasks are
>>> > > > deployed to which TMs. Only then, you can decide for which
>>> sub-tasks
>>> > you
>>> > > > can send the data together. Since the output emitters are agnostic
>>> to
>>> > the
>>> > > > actual deployment, the necessary information would have to be
>>> forwarded
>>> > > to
>>> > > > them.
>>> > > >
>>> > > > Another problem is that if you pick one of the sub-tasks to
>>> receive the
>>> > > > broadcast set, then you have to make sure, that this sub-task has
>>> read
>>> > > and
>>> > > > materialized the broadcast set before the other sub-tasks start
>>> > working.
>>> > > > One could maybe send to one sub-task first the broadcast set and
>>> then
>>> > to
>>> > > > all other sub-tasks, after one has sent the BC set, a kind of
>>> > acknowledge
>>> > > > record. That way, the other sub-tasks would block until the
>>> broadcast
>>> > set
>>> > > > has been completely transmitted. But here one has to make sure
>>> that the
>>> > > > sub-task receiving the BC set has been deployed and is not queued
>>> up
>>> > for
>>> > > > scheduling.
>>> > > >
>>> > > > So there are some challenges to solve in order to optimize the BC
>>> sets.
>>> > > > Currently, there is nobody working on it. If you want to start
>>> working
>>> > on
>>> > > > it, then I would recommend to open a JIRA and start writing a
>>> design
>>> > > > document for it.
>>> > > >
>>> > > > Cheers,
>>> > > > Till
>>> > > >
>>> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas <
>>> > > andreas.ku...@tu-berlin.de
>>> > > > >
>>> > > > wrote:
>>> > > >
>>> > > > > Hi,
>>> > > > >
>>> > > > >
>>> > > > > we experience some unexpected increase of data sent over the
>>> network
>>> > > for
>>> > > > > broadcasts with increasing number of slots per Taskmanager.
>>> > > > >
>>> > > > >
>>> > > > > We provided a benchmark [1]. It not only increases the size of
>>> data
>>> > > sent
>>> > > > > over the network but also hurts performance as seen in the
>>> > preliminary
>>> > > > > results below. In this results cloud-11 has 25 nodes and
>>> ibm-power
>>> > has
>>> > > 8
>>> > > > > nodes with scaling the number of slots per node from 1 - 16.
>>> > > > >
>>> > > > >
>>> > > > > +-----------------------+--------------+-------------+
>>> > > > > | suite                 | name         | median_time |
>>> > > > > +=======================+==============+=============+
>>> > > > > | broadcast.cloud-11    | broadcast.01 |        8796 |
>>> > > > > | broadcast.cloud-11    | broadcast.02 |       14802 |
>>> > > > > | broadcast.cloud-11    | broadcast.04 |       30173 |
>>> > > > > | broadcast.cloud-11    | broadcast.08 |       56936 |
>>> > > > > | broadcast.cloud-11    | broadcast.16 |      117507 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.01 |        6807 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.02 |        8443 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.04 |       11823 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.08 |       21655 |
>>> > > > > | broadcast.ibm-power-1 | broadcast.16 |       37426 |
>>> > > > > +-----------------------+--------------+-------------+
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > > > After looking into the code base it, it seems that the data is
>>> > > > > de-serialized only once per TM, but the actual data is sent for
>>> all
>>> > > slots
>>> > > > > running the operator with broadcast vars and just gets discarded
>>> in
>>> > > case
>>> > > > > its already de-serialized.
>>> > > > >
>>> > > > >
>>> > > > > I do not see a reason the data can't be shared among the slots
>>> of a
>>> > TM
>>> > > > and
>>> > > > > therefore just sent once, but I guess it would require quite some
>>> > > changes
>>> > > > > bc sets are handled currently.
>>> > > > >
>>> > > > >
>>> > > > > Are there any future plans regarding this and/or is there
>>> interest in
>>> > > > this
>>> > > > > "feature"?
>>> > > > >
>>> > > > >
>>> > > > > Best
>>> > > > >
>>> > > > > Andreas?
>>> > > > >
>>> > > > >
>>> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast?
>>> > > > >
>>> > > > >
>>> > > > >
>>> > > >
>>> > >
>>> >
>>>
>>
>>
>

Reply via email to