Hi everybody, I found an issue with my first approach therefore I couldn't run the experiments yet. In the design document I summarized my ideas and work of the last weeks on this issue.
You can find the design document here: https://docs.google.com/document/d/1odYIvmQt4feonQF9q-btBnGvrzzN3lX0Os6rzHcCOjA/edit?usp=sharing I highly appreciate any idea or comment and I am looking forward to the discussion to finally solve this issue :) Best regards, Felix 2016-07-08 1:47 GMT+02:00 Felix Neutatz <neut...@googlemail.com>: > Hi, > > i already started to work on this issue. Therefore I created a Jira: > https://issues.apache.org/jira/browse/FLINK-4175 > I have already implemented a quick version which could solve it. I will > run the experiments on the cluster first and will describe my approach on > Monday :) > > Have a nice weekend, > Felix > > P.S. for super curious people: > https://github.com/FelixNeutatz/incubator-flink/commit/7d79d4dfe3f18208a73d6b692b3909f9c69a1da7 > > 2016-06-09 11:50 GMT+02:00 Felix Neutatz <neut...@googlemail.com>: > >> Hi everybody, >> >> could we use the org.apache.flink.api.common.cache.DistributedCache to >> work around this Broadcast issue for the moment, until we fixed it? >> Or do you think it won't scale either? >> >> Best regards, >> Felix >> >> 2016-06-09 10:57 GMT+02:00 Stephan Ewen <se...@apache.org>: >> >>> Till is right. Broadcast joins currently materialize once per slot. >>> Originally, the purely push based runtime was not good enough to handle >>> it >>> differently. >>> >>> By now, we could definitely handle BC Vars differently (only one slot per >>> TM requests). >>> For BC Joins, the hash tables do not coordinate spilling currently, which >>> means that we cannot do multiple joins through the same hash table. >>> >>> >>> On Thu, Jun 9, 2016 at 10:17 AM, Till Rohrmann <trohrm...@apache.org> >>> wrote: >>> >>> > If I'm not mistaken, then broadcast variables and broadcast inputs of >>> joins >>> > follow different code paths. Broadcast variables use additional input >>> > channels and are read before the actual driver code runs. In contrast >>> to >>> > that, a join operation is a two input operator where the join driver >>> > decides how to handle the inputs (which one to read first as build >>> input). >>> > >>> > This also entails that the broadcast variable optimization, where each >>> task >>> > manager holds the data only once and copies of the data are discarded >>> (but >>> > they are transmitted multiple times to the TM), does not apply to the >>> > broadcast join inputs. Here you should see an slightly worse >>> performance >>> > degradation with your initial benchmark if you increase the number of >>> > slots. >>> > >>> > Cheers, >>> > Till >>> > >>> > On Wed, Jun 8, 2016 at 9:14 PM, Alexander Alexandrov < >>> > alexander.s.alexand...@gmail.com> wrote: >>> > >>> > > > As far as I know, the reason why the broadcast variables are >>> > implemented >>> > > that way is that the senders would have to know which sub-tasks are >>> > > deployed to which TMs. >>> > > >>> > > As the broadcast variables are realized as additionally attached >>> > "broadcast >>> > > channels", I am assuming that the same behavior will apply for >>> broadcast >>> > > joins as well. >>> > > >>> > > Is this the case? >>> > > >>> > > Regards, >>> > > Alexander >>> > > >>> > > >>> > > 2016-06-08 17:13 GMT+02:00 Kunft, Andreas < >>> andreas.ku...@tu-berlin.de>: >>> > > >>> > > > Hi Till, >>> > > > >>> > > > thanks for the fast answer. >>> > > > I'll think about a concrete way of implementing and open an JIRA. >>> > > > >>> > > > Best >>> > > > Andreas >>> > > > ________________________________________ >>> > > > Von: Till Rohrmann <trohrm...@apache.org> >>> > > > Gesendet: Mittwoch, 8. Juni 2016 15:53 >>> > > > An: dev@flink.apache.org >>> > > > Betreff: Re: Broadcast data sent increases with # slots per TM >>> > > > >>> > > > Hi Andreas, >>> > > > >>> > > > your observation is correct. The data is sent to each slot and the >>> > > > receiving TM only materializes one copy of the data. The rest of >>> the >>> > data >>> > > > is discarded. >>> > > > >>> > > > As far as I know, the reason why the broadcast variables are >>> > implemented >>> > > > that way is that the senders would have to know which sub-tasks are >>> > > > deployed to which TMs. Only then, you can decide for which >>> sub-tasks >>> > you >>> > > > can send the data together. Since the output emitters are agnostic >>> to >>> > the >>> > > > actual deployment, the necessary information would have to be >>> forwarded >>> > > to >>> > > > them. >>> > > > >>> > > > Another problem is that if you pick one of the sub-tasks to >>> receive the >>> > > > broadcast set, then you have to make sure, that this sub-task has >>> read >>> > > and >>> > > > materialized the broadcast set before the other sub-tasks start >>> > working. >>> > > > One could maybe send to one sub-task first the broadcast set and >>> then >>> > to >>> > > > all other sub-tasks, after one has sent the BC set, a kind of >>> > acknowledge >>> > > > record. That way, the other sub-tasks would block until the >>> broadcast >>> > set >>> > > > has been completely transmitted. But here one has to make sure >>> that the >>> > > > sub-task receiving the BC set has been deployed and is not queued >>> up >>> > for >>> > > > scheduling. >>> > > > >>> > > > So there are some challenges to solve in order to optimize the BC >>> sets. >>> > > > Currently, there is nobody working on it. If you want to start >>> working >>> > on >>> > > > it, then I would recommend to open a JIRA and start writing a >>> design >>> > > > document for it. >>> > > > >>> > > > Cheers, >>> > > > Till >>> > > > >>> > > > On Wed, Jun 8, 2016 at 1:45 PM, Kunft, Andreas < >>> > > andreas.ku...@tu-berlin.de >>> > > > > >>> > > > wrote: >>> > > > >>> > > > > Hi, >>> > > > > >>> > > > > >>> > > > > we experience some unexpected increase of data sent over the >>> network >>> > > for >>> > > > > broadcasts with increasing number of slots per Taskmanager. >>> > > > > >>> > > > > >>> > > > > We provided a benchmark [1]. It not only increases the size of >>> data >>> > > sent >>> > > > > over the network but also hurts performance as seen in the >>> > preliminary >>> > > > > results below. In this results cloud-11 has 25 nodes and >>> ibm-power >>> > has >>> > > 8 >>> > > > > nodes with scaling the number of slots per node from 1 - 16. >>> > > > > >>> > > > > >>> > > > > +-----------------------+--------------+-------------+ >>> > > > > | suite | name | median_time | >>> > > > > +=======================+==============+=============+ >>> > > > > | broadcast.cloud-11 | broadcast.01 | 8796 | >>> > > > > | broadcast.cloud-11 | broadcast.02 | 14802 | >>> > > > > | broadcast.cloud-11 | broadcast.04 | 30173 | >>> > > > > | broadcast.cloud-11 | broadcast.08 | 56936 | >>> > > > > | broadcast.cloud-11 | broadcast.16 | 117507 | >>> > > > > | broadcast.ibm-power-1 | broadcast.01 | 6807 | >>> > > > > | broadcast.ibm-power-1 | broadcast.02 | 8443 | >>> > > > > | broadcast.ibm-power-1 | broadcast.04 | 11823 | >>> > > > > | broadcast.ibm-power-1 | broadcast.08 | 21655 | >>> > > > > | broadcast.ibm-power-1 | broadcast.16 | 37426 | >>> > > > > +-----------------------+--------------+-------------+ >>> > > > > >>> > > > > >>> > > > > >>> > > > > After looking into the code base it, it seems that the data is >>> > > > > de-serialized only once per TM, but the actual data is sent for >>> all >>> > > slots >>> > > > > running the operator with broadcast vars and just gets discarded >>> in >>> > > case >>> > > > > its already de-serialized. >>> > > > > >>> > > > > >>> > > > > I do not see a reason the data can't be shared among the slots >>> of a >>> > TM >>> > > > and >>> > > > > therefore just sent once, but I guess it would require quite some >>> > > changes >>> > > > > bc sets are handled currently. >>> > > > > >>> > > > > >>> > > > > Are there any future plans regarding this and/or is there >>> interest in >>> > > > this >>> > > > > "feature"? >>> > > > > >>> > > > > >>> > > > > Best >>> > > > > >>> > > > > Andreas? >>> > > > > >>> > > > > >>> > > > > [1] https://github.com/TU-Berlin-DIMA/flink-broadcast? >>> > > > > >>> > > > > >>> > > > > >>> > > > >>> > > >>> > >>> >> >> >