Hi Alex, Can you attach the output of sql("explain extended <your query>").collect.foreach(println)?
Thanks, Yin On Fri, Jan 16, 2015 at 1:54 PM, Alessandro Baretta <alexbare...@gmail.com> wrote: > Reynold, > > The source file you are directing me to is a little too terse for me to > understand what exactly is going on. Let me tell you what I'm trying to do > and what problems I'm encountering, so that you might be able to better > direct me investigation of the SparkSQL codebase. > > I am computing the join of three tables, sharing the same primary key, > composed of three fields, and having several other fields. My first attempt > at computing this join was in SQL, with a query much like this slightly > simplified one: > > SELECT > a.key1 key1, a.key2 key2, a.key3 key3, > a.data1 adata1, a.data2 adata2, ... > b.data1 bdata1, b.data2 bdata2, ... > c.data1 cdata1, c.data2 cdata2, ... > FROM a, b, c > WHERE > a.key1 = b.key1 AND a.key2 = b.key2 AND a.key3 = b.key3 > b.key1 = c.key1 AND b.key2 = c.key2 AND b.key3 = c.key3 > > This code yielded a SparkSQL job containing 40,000 stages, which failed > after filling up all available disk space on the worker nodes. > > I then wrote this join as a plain mapreduce join. The code looks roughly > like this: > val a_ = a.map(row => (key(row), ("a", row)) > val b_ = b.map(row => (key(row), ("b", row)) > val c_ = c.map(row => (key(row), ("c", row")) > val join = UnionRDD(sc, List(a_, b_, c_)).groupByKey > > This implementation yields approximately 1600 stages and completes in a few > minutes on a 256 core cluster. The huge difference in scale of the two jobs > makes me think that SparkSQL is implementing my join as cartesian product. > This is they query plan--I'm not sure I can read it, but it does seem to > imply that the filter conditions are not being pushed far down enough: > > 'Project [...] > 'Filter (((((('a.key1 = 'b.key1)) && ('a.key2 = b.key2)) && ...) > 'Join Inner, None > 'Join Inner, None > > Is maybe SparkSQL unable to push join conditions down from the WHERE clause > into the join itself? > > Alex > > On Thu, Jan 15, 2015 at 10:36 AM, Reynold Xin <r...@databricks.com> wrote: > > > It's a bunch of strategies defined here: > > > https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkStrategies.scala > > > > In most common use cases (e.g. inner equi join), filters are pushed below > > the join or into the join. Doing a cartesian product followed by a filter > > is too expensive. > > > > > > On Thu, Jan 15, 2015 at 7:39 AM, Alessandro Baretta < > alexbare...@gmail.com > > > wrote: > > > >> Hello, > >> > >> Where can I find docs about how joins are implemented in SparkSQL? In > >> particular, I'd like to know whether they are implemented according to > >> their relational algebra definition as filters on top of a cartesian > >> product. > >> > >> Thanks, > >> > >> Alex > >> > > > > >