Hmm, just found that there is no JoinHint that would allow what I described above.
Broadcasting one input and using the other one to build a hash-tables is usually not a good thing to do, because the broadcasted side should be much smaller than the other one... 2014-10-31 21:56 GMT+01:00 Fabian Hueske <[email protected]>: > Just had another idea. > The group-wise crossing that you are doing is actually a self-join on the > grouping key. > The system has currently no special strategy to deal with selfjoins. That > means both inputs of the join (which are identical) are treated as two > individual inputs. If you force a broadcast of the one side and build a > hash partition on the other side, the following would happen: > > The broadcasted input would be replicate and sent to each individual > worker thread. The other input would remain local and be still partitioned > and therefore smaller on each node. That's why you would build the > hash-table from the partitioned input. The larger, replicated input would > be streamed along the hash tables. Because the inputs are not partitioned > on the key, there should be no loadbalancing issues (depending on the > previous partitioning, it can be even perfectly balanced...) > > However, this might not work (well) if the input is too large to be > replicated a lot (or the smaller partitions are too large for in-memory > hash-tables). > > Best, Fabian > > 2014-10-30 17:56 GMT+01:00 Fabian Hueske <[email protected]>: > >> Hi Martin, >> >> Flink does not have features to mitigate data skew at the moment, such as >> dynamic partitioning. >> That would also "only" allow to process large groups as an individual >> partitions and multiple smaller groups together in other partitions. >> The issue of having a large group would not be solved with that. This is >> more on the application-level right now and could for example be solved by >> adding something like a group-cross operator... >> >> I think your approach of emitting multiple smaller partitions from a >> group-reduce, reshuffle (there is a rebalance operator [1]), and apply a >> flatmap sounds like a good idea to me. >> At least, I didn't come up with a better approach ;-) >> >> Cheers, Fabian >> >> [1] >> http://flink.incubator.apache.org/docs/0.7-incubating/programming_guide.html#transformations >> >> 2014-10-28 20:53 GMT+01:00 Martin Neumann <[email protected]>: >> >>> I have some problem with load balancing and was wondering how to deal >>> with >>> this kind of problem in Flink. >>> The input I have is a data set of grouped ID's that I join with metadata >>> for each ID. Then I need to compare each Item in a group with each other >>> item in that group and if necessary splitting it into different >>> subgroups. >>> In flink its a join followed by a group reduce. >>> >>> The problem is that the groups differ a lot in size. 90% of the groups >>> are >>> done in 5 minutes while the rest takes 2 hours. In order to get this more >>> efficient I would need to distribute the N to N comparison that currently >>> is done in the group reduce function. Anyone has an idea how I can do >>> that >>> in a simple way? >>> >>> My current Idea is to make the group reduce step emit computation >>> partitions and then do another flat-map step to do the actual >>> computation. >>> Would this solve the problem? >>> >>> cheers Martin >>> >> >> >
