You can force a broadcast, but with tables that large its probably not a
good idea.  However, filtering and then broadcasting one of the joins is
likely to get you the benefits of broadcasting (no shuffle on the larger
table that will colocate all the skewed tuples to a single overloaded
executor) without attempting to broadcast something thats too large.

On Sun, Aug 14, 2016 at 11:02 AM, Jacek Laskowski <ja...@japila.pl> wrote:

> Hi Michael,
>
> As I understand broadcast joins, Jestin could also use broadcast
> function on a dataset to make it broadcast. Jestin could force the
> brodcast without the trick hoping it's gonna kick off brodcast.
> Correct?
>
> Pozdrawiam,
> Jacek Laskowski
> ----
> https://medium.com/@jaceklaskowski/
> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark
> Follow me at https://twitter.com/jaceklaskowski
>
>
> On Sun, Aug 14, 2016 at 9:51 AM, Michael Armbrust
> <mich...@databricks.com> wrote:
> > Have you tried doing the join in two parts (id == 0 and id != 0) and then
> > doing a union of the results?  It is possible that with this technique,
> that
> > the join which only contains skewed data would be filtered enough to
> allow
> > broadcasting of one side.
> >
> > On Sat, Aug 13, 2016 at 11:15 PM, Jestin Ma <jestinwith.a...@gmail.com>
> > wrote:
> >>
> >> Hi, I'm currently trying to perform an outer join between two
> >> DataFrames/Sets, one is ~150GB, one is about ~50 GB on a column, id.
> >>
> >> df1.id is skewed in that there are many 0's, the rest being unique IDs.
> >>
> >> df2.id is not skewed. If I filter df1.id != 0, then the join works
> well.
> >> If I don't, then the join does not complete for a very, very long time.
> >>
> >> I have diagnosed this problem due to the hashpartitioning on IDs,
> >> resulting in one partition containing many values due to data skew. One
> >> executor ends up reading most of the shuffle data, and writing all of
> the
> >> shuffle data, as shown below.
> >>
> >>
> >>
> >>
> >>
> >> Shown above is the task in question assigned to one executor.
> >>
> >>
> >>
> >> This screenshot comes from one of the executors, showing one single
> thread
> >> spilling sort data since the executor cannot hold 90%+ of the ~200 GB
> result
> >> in memory.
> >>
> >> Moreover, looking at the event timeline, I find that the executor on
> that
> >> task spends about 20% time reading shuffle data, 70% computation, and
> 10%
> >> writing output data.
> >>
> >> I have tried the following:
> >>
> >> "Salting" the 0-value keys by monotonically_increasing_id().mod(N)
> >> - This doesn't seem to have an effect since now I have
> hundreds/thousands
> >> of keys with tens of thousands of occurrences.
> >> - Should I increase N? Is there a way to just do random.mod(N) instead
> of
> >> monotonically_increasing_id()?
> >>
> >> Repartitioning according to column I know contains unique values
> >>
> >> - This is overridden by Spark's sort-based shuffle manager which hash
> >> repartitions on the skewed column
> >>
> >> - Is it possible to change this? Or will the join column need to be
> hashed
> >> and partitioned on for joins to work
> >>
> >> Broadcasting does not work for my large tables
> >>
> >> Increasing/decreasing spark.sql.shuffle.partitions does not remedy the
> >> skewed data problem as 0-product values are still being hashed to the
> same
> >> partition.
> >>
> >>
> >> ----------------------------------
> >>
> >> What I am considering currently is doing the join at the RDD level, but
> is
> >> there any level of control which can solve my skewed data problem? Other
> >> than that, see the bolded question.
> >>
> >> I would appreciate any suggestions/tips/experience with this. Thank you!
> >>
> >
>
  • [no subject] Jestin Ma
    • Re: Mich Talebzadeh
      • Re: Jestin Ma
        • Re: Mich Talebzadeh
    • Re: Michael Armbrust
      • Re: Jacek Laskowski
        • Re: Michael Armbrust
          • Re: Jestin Ma

Reply via email to