Hi Alex,

Can you attach the output of sql("explain extended <your
query>").collect.foreach(println)?

Thanks,

Yin

On Fri, Jan 16, 2015 at 1:54 PM, Alessandro Baretta <alexbare...@gmail.com>
wrote:

> Reynold,
>
> The source file you are directing me to is a little too terse for me to
> understand what exactly is going on. Let me tell you what I'm trying to do
> and what problems I'm encountering, so that you might be able to better
> direct me investigation of the SparkSQL codebase.
>
> I am computing the join of three tables, sharing the same primary key,
> composed of three fields, and having several other fields. My first attempt
> at computing this join was in SQL, with a query much like this slightly
> simplified one:
>
>          SELECT
>           a.key1 key1, a.key2 key2, a.key3 key3,
>           a.data1   adata1,    a.data2    adata2,    ...
>           b.data1   bdata1,    b.data2    bdata2,    ...
>           c.data1   cdata1,    c.data2    cdata2,    ...
>         FROM a, b, c
>         WHERE
>           a.key1 = b.key1 AND a.key2 = b.key2 AND a.key3 = b.key3
>           b.key1 = c.key1 AND b.key2 = c.key2 AND b.key3 = c.key3
>
> This code yielded a SparkSQL job containing 40,000 stages, which failed
> after filling up all available disk space on the worker nodes.
>
> I then wrote this join as a plain mapreduce join. The code looks roughly
> like this:
> val a_ = a.map(row => (key(row), ("a", row))
> val b_ = b.map(row => (key(row), ("b", row))
> val c_ = c.map(row => (key(row), ("c", row"))
> val join = UnionRDD(sc, List(a_, b_, c_)).groupByKey
>
> This implementation yields approximately 1600 stages and completes in a few
> minutes on a 256 core cluster. The huge difference in scale of the two jobs
> makes me think that SparkSQL is implementing my join as cartesian product.
> This is they query plan--I'm not sure I can read it, but it does seem to
> imply that the filter conditions are not being pushed far down enough:
>
>  'Project [...]
>  'Filter (((((('a.key1 = 'b.key1)) && ('a.key2 = b.key2)) && ...)
>   'Join Inner, None
>    'Join Inner, None
>
> Is maybe SparkSQL unable to push join conditions down from the WHERE clause
> into the join itself?
>
> Alex
>
> On Thu, Jan 15, 2015 at 10:36 AM, Reynold Xin <r...@databricks.com> wrote:
>
> > It's a bunch of strategies defined here:
> >
> https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala
> >
> > In most common use cases (e.g. inner equi join), filters are pushed below
> > the join or into the join. Doing a cartesian product followed by a filter
> > is too expensive.
> >
> >
> > On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta <
> alexbare...@gmail.com
> > > wrote:
> >
> >> Hello,
> >>
> >> Where can I find docs about how joins are implemented in SparkSQL? In
> >> particular, I'd like to know whether they are implemented according to
> >> their relational algebra definition as filters on top of a cartesian
> >> product.
> >>
> >> Thanks,
> >>
> >> Alex
> >>
> >
> >
>

Reply via email to