RE: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-24 Thread Yong Zhang
The patch works as I expect.
The DAG shows the broadcast joining in stage 4, eliminating following stages, 
and data generated right after it (There are no shuffle write in this stage any 
more).Much faster than before.
  If you like this patch back port to 1.5 and 1.6, please vote in 
JIRA SPARK-13383 

Thanks
Yong
From: java8...@hotmail.com
To: dav...@databricks.com
CC: user@spark.apache.org
Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the 
last step
Date: Wed, 23 Mar 2016 20:30:42 -0400




Sounds good.
I will manual merge this patch on 1.6.1, and test again for my case tomorrow on 
my environment and will update later.
Thanks
Yong

> Date: Wed, 23 Mar 2016 16:20:23 -0700
> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> From: dav...@databricks.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
> 
> On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang <java8...@hotmail.com> wrote:
> > Here is the output:
> >
> > == Parsed Logical Plan ==
> > Project [400+ columns]
> > +- Project [400+ columns]
> >+- Project [400+ columns]
> >   +- Project [400+ columns]
> >  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> >+- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Relation[400+ columns] ParquetRelation
> >
> > == Analyzed Logical Plan ==
> > 400+ columns
> > Project [400+ columns]
> > +- Project [400+ columns]
> >+- Project [400+ columns]
> >   +- Project [400+ columns]
> >  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> >+- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Relation[400+ columns] ParquetRelation
> >
> > == Optimized Logical Plan ==
> > Project [400+ columns]
> > +- Join Inner, Somevisid_high#460L = visid_high#948L) && (visid_low#461L
> > = visid_low#949L)) && (date_time#25L > date_time#513L)))
> >:- Relation[400+ columns] ParquetRelation
> >+- Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
> >   +- BroadcastHint
> >  +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> > +- Filter (instr(event_list#105,202) > 0)
> >+- Relation[400+ columns] ParquetRelation
> 
> There is a Project on top of BroadcastHint, which is inserted by
> column pruning rule, that make
> the SparkStratege can not regonize BroadcastHint anymore, it's fixed
> recently in master [1]
> 
> https://github.com/apache/spark/pull/11260
> 
> Your join should run as expected in master.
> 
> > == Physical Plan ==
> > Project [400+ columns]
> > +- Filter (date_time#25L > date_time#513L)
> >+- SortMergeJoin [visid_high#948L,visid_low#949L],
> > [visid_high#460L,visid_low#461L]
> >   :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0
> >   :  +- TungstenExchange
> > hashpartitioning(visid_high#948L,visid_low#949L,200), None
> >   : +- Scan ParquetRelation[400+ columns] InputPaths:
> > hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19,
> > hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22,
> > hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25,
> > hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28,
> > hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31,
> > hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03,
> > hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06,
> > hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09,
> > hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12,
> > hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15,
> > hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18,
> > hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20,

RE: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Yong Zhang
Sounds good.
I will manual merge this patch on 1.6.1, and test again for my case tomorrow on 
my environment and will update later.
Thanks
Yong

> Date: Wed, 23 Mar 2016 16:20:23 -0700
> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> From: dav...@databricks.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
> 
> On Wed, Mar 23, 2016 at 10:35 AM, Yong Zhang <java8...@hotmail.com> wrote:
> > Here is the output:
> >
> > == Parsed Logical Plan ==
> > Project [400+ columns]
> > +- Project [400+ columns]
> >+- Project [400+ columns]
> >   +- Project [400+ columns]
> >  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> >+- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Relation[400+ columns] ParquetRelation
> >
> > == Analyzed Logical Plan ==
> > 400+ columns
> > Project [400+ columns]
> > +- Project [400+ columns]
> >+- Project [400+ columns]
> >   +- Project [400+ columns]
> >  +- Join Inner, Somevisid_high#460L = visid_high#948L) &&
> > (visid_low#461L = visid_low#949L)) && (date_time#25L > date_time#513L)))
> > :- Relation[400+ columns] ParquetRelation
> > +- BroadcastHint
> >+- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Relation[400+ columns] ParquetRelation
> >
> > == Optimized Logical Plan ==
> > Project [400+ columns]
> > +- Join Inner, Somevisid_high#460L = visid_high#948L) && (visid_low#461L
> > = visid_low#949L)) && (date_time#25L > date_time#513L)))
> >:- Relation[400+ columns] ParquetRelation
> >+- Project [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
> >   +- BroadcastHint
> >  +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> > +- Filter (instr(event_list#105,202) > 0)
> >+- Relation[400+ columns] ParquetRelation
> 
> There is a Project on top of BroadcastHint, which is inserted by
> column pruning rule, that make
> the SparkStratege can not regonize BroadcastHint anymore, it's fixed
> recently in master [1]
> 
> https://github.com/apache/spark/pull/11260
> 
> Your join should run as expected in master.
> 
> > == Physical Plan ==
> > Project [400+ columns]
> > +- Filter (date_time#25L > date_time#513L)
> >+- SortMergeJoin [visid_high#948L,visid_low#949L],
> > [visid_high#460L,visid_low#461L]
> >   :- Sort [visid_high#948L ASC,visid_low#949L ASC], false, 0
> >   :  +- TungstenExchange
> > hashpartitioning(visid_high#948L,visid_low#949L,200), None
> >   : +- Scan ParquetRelation[400+ columns] InputPaths:
> > hdfs://xxx/2015/12/17, hdfs://xxx/2015/12/18, hdfs://xxx/2015/12/19,
> > hdfs://xxx/2015/12/20, hdfs://xxx/2015/12/21, hdfs://xxx/2015/12/22,
> > hdfs://xxx/2015/12/23, hdfs://xxx/2015/12/24, hdfs://xxx/2015/12/25,
> > hdfs://xxx/2015/12/26, hdfs://xxx/2015/12/27, hdfs://xxx/2015/12/28,
> > hdfs://xxx/2015/12/29, hdfs://xxx/2015/12/30, hdfs://xxx/2015/12/31,
> > hdfs://xxx/2016/01/01, hdfs://xxx/2016/01/02, hdfs://xxx/2016/01/03,
> > hdfs://xxx/2016/01/04, hdfs://xxx/2016/01/05, hdfs://xxx/2016/01/06,
> > hdfs://xxx/2016/01/07, hdfs://xxx/2016/01/08, hdfs://xxx/2016/01/09,
> > hdfs://xxx/2016/01/10, hdfs://xxx/2016/01/11, hdfs://xxx/2016/01/12,
> > hdfs://xxx/2016/01/13, hdfs://xxx/2016/01/14, hdfs://xxx/2016/01/15,
> > hdfs://xxx/2016/01/16, hdfs://xxx/2016/01/17, hdfs://xxx/2016/01/18,
> > hdfs://xxx/2016/01/19, hdfs://xxx/2016/01/20, hdfs://xxx/2016/01/21,
> > hdfs://xxx/2016/01/22, hdfs://xxx/2016/01/23, hdfs://xxx/2016/01/24,
> > hdfs://xxx/2016/01/25, hdfs://xxx/2016/01/26, hdfs://xxx/2016/01/27,
> > hdfs://xxx/2016/01/28, hdfs://xxx/2016/01/29, hdfs://xxx/2016/01/30,
> > hdfs://xxx/2016/01/31, hdfs://xxx/2016/02/01, hdfs://xxx/2016/02/02,
> > hdfs://xxx/2016/02/03, hdfs://xxx/2016/02/04, hdfs://xxx/2016/02/05,
> > hdfs://xxx/2016/02/06, hdfs://xxx/2016/02/07, hdfs://xxx/2016/02/08,
> > hdfs://xxx/2016/02/09, hdfs:/

Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Ted Yu
17,
> > hdfs://xxx/2016/02/18, hdfs://xxx/2016/02/19, hdfs://xxx/2016/02/20,
> > hdfs://xxx/2016/02/21, hdfs://xxx/2016/02/22, hdfs://xxx/2016/02/23,
> > hdfs://xxx/2016/02/24, hdfs://xxx/2016/02/25, hdfs://xxx/2016/02/26,
> > hdfs://xxx/2016/02/27, hdfs://xxx/2016/02/28, hdfs://xxx/2016/02/29,
> > hdfs://xxx/2016/03/01, hdfs://xxx/2016/03/02, hdfs://xxx/2016/03/03,
> > hdfs://xxx/2016/03/04, hdfs://xxx/2016/03/05, hdfs://xxx/2016/03/06,
> > hdfs://xxx/2016/03/07, hdfs://xxx/2016/03/08, hdfs://xxx/2016/03/09,
> > hdfs://xxx/2016/03/10, hdfs://xxx/2016/03/11, hdfs://xxx/2016/03/12,
> > hdfs://xxx/2016/03/13, hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15,
> > hdfs://xxx/2016/03/16, hdfs://xxx/2016/03/17
> >   +- Sort [visid_high#460L ASC,visid_low#461L ASC], false, 0
> >  +- TungstenExchange
> > hashpartitioning(visid_high#460L,visid_low#461L,200), None
> > +- Project
> > [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
> >    +- Project [soid_e1#30 AS
> > account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
> >   +- Filter (instr(event_list#105,202) > 0)
> >  +- Scan
> >
> ParquetRelation[visid_low#461L,ip#127,soid_e1#30,event_list#105,visid_high#460L,date_time#25L]
> > InputPaths: hdfs://xxx/2016/03/17
> >
> > This dataset has more than 480 columns in parquet file, so I replaced
> them
> > with "400+ columns", without blow out the email, but I don't think this
> > could do anything with "broadcast" problem.
> >
> > Thanks
> >
> > Yong
> >
> >
> >> Date: Wed, 23 Mar 2016 10:14:19 -0700
> >> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in
> >> the last step
> >> From: dav...@databricks.com
> >> To: java8...@hotmail.com
> >> CC: user@spark.apache.org
> >
> >>
> >> The broadcast hint does not work as expected in this case, could you
> >> also how the logical plan by 'explain(true)'?
> >>
> >> On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8...@hotmail.com>
> wrote:
> >> >
> >> > So I am testing this code to understand "broadcast" feature of DF on
> >> > Spark 1.6.1.
> >> > This time I am not disable "tungsten". Everything is default value,
> >> > except setting memory and cores of my job on 1.6.1.
> >> >
> >> > I am testing the join2 case
> >> >
> >> > val join2 = historyRaw.join(broadcast(trialRaw),
> trialRaw("visid_high")
> >> > === historyRaw("visid_high") && trialRaw("visid_low") ===
> >> > historyRaw("visid_low") && trialRaw("date_time") >
> historyRaw("date_time"))
> >> >
> >> > and here is the DAG visualization in the runtime of my testing job:
> >> >
> >> >
> >> >
> >> >
> >> >
> >> > So now, I don't understand how the "broadcast" works on DateFrame in
> >> > Spark. I originally thought it will be the same as "mapjoin" in the
> hive,
> >> > but can someone explain the DAG above me?
> >> >
> >> > I have one day data about 1.5G compressed parquet file, filter by
> >> > "instr(loadRaw("event_list"), "202") > 0", which will only output
> about 1494
> >> > rows (very small), and it is the "trailRaw" DF in my example.
> >> > Stage 3 has a filter, which I thought is for the trailRaw data, but
> the
> >> > stage statics doesn't match with the data. I don't know why the input
> is
> >> > only 78M, and shuffle write is about 97.6KB
> >> >
> >> >
> >> >
> >> >
> >> > The historyRaw will be about 90 days history data, which should be
> about
> >> > 100G, so it looks like stage 4 is scanning it
> >> >
> >> >
> >> >
> >> >
> >> > Now, my original thought is that small data will be broadcasted to all
> >> > the nodes, and most of history data will be filtered out by the join
> keys,
> >> > at least that will be the "mapjoin" in Hive will do, but from the DAG
> above,
> >> > I didn't see it working this way.
> >> > It is more like that Spark use the SortMerge join to shuffle both data
> >> > across network, and filter on the "reducers" side by t

Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Davies Liu
xx/2016/03/13, hdfs://xxx/2016/03/14, hdfs://xxx/2016/03/15,
> hdfs://xxx/2016/03/16, hdfs://xxx/2016/03/17
>   +- Sort [visid_high#460L ASC,visid_low#461L ASC], false, 0
>  +- TungstenExchange
> hashpartitioning(visid_high#460L,visid_low#461L,200), None
> +- Project
> [date_time#25L,visid_low#461L,visid_high#460L,account_id#976]
>+- Project [soid_e1#30 AS
> account_id#976,visid_high#460L,visid_low#461L,date_time#25L,ip#127]
>   +- Filter (instr(event_list#105,202) > 0)
>  +- Scan
> ParquetRelation[visid_low#461L,ip#127,soid_e1#30,event_list#105,visid_high#460L,date_time#25L]
> InputPaths: hdfs://xxx/2016/03/17
>
> This dataset has more than 480 columns in parquet file, so I replaced them
> with "400+ columns", without blow out the email, but I don't think this
> could do anything with "broadcast" problem.
>
> Thanks
>
> Yong
>
>
>> Date: Wed, 23 Mar 2016 10:14:19 -0700
>> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in
>> the last step
>> From: dav...@databricks.com
>> To: java8...@hotmail.com
>> CC: user@spark.apache.org
>
>>
>> The broadcast hint does not work as expected in this case, could you
>> also how the logical plan by 'explain(true)'?
>>
>> On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8...@hotmail.com> wrote:
>> >
>> > So I am testing this code to understand "broadcast" feature of DF on
>> > Spark 1.6.1.
>> > This time I am not disable "tungsten". Everything is default value,
>> > except setting memory and cores of my job on 1.6.1.
>> >
>> > I am testing the join2 case
>> >
>> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high")
>> > === historyRaw("visid_high") && trialRaw("visid_low") ===
>> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>> >
>> > and here is the DAG visualization in the runtime of my testing job:
>> >
>> >
>> >
>> >
>> >
>> > So now, I don't understand how the "broadcast" works on DateFrame in
>> > Spark. I originally thought it will be the same as "mapjoin" in the hive,
>> > but can someone explain the DAG above me?
>> >
>> > I have one day data about 1.5G compressed parquet file, filter by
>> > "instr(loadRaw("event_list"), "202") > 0", which will only output about 
>> > 1494
>> > rows (very small), and it is the "trailRaw" DF in my example.
>> > Stage 3 has a filter, which I thought is for the trailRaw data, but the
>> > stage statics doesn't match with the data. I don't know why the input is
>> > only 78M, and shuffle write is about 97.6KB
>> >
>> >
>> >
>> >
>> > The historyRaw will be about 90 days history data, which should be about
>> > 100G, so it looks like stage 4 is scanning it
>> >
>> >
>> >
>> >
>> > Now, my original thought is that small data will be broadcasted to all
>> > the nodes, and most of history data will be filtered out by the join keys,
>> > at least that will be the "mapjoin" in Hive will do, but from the DAG 
>> > above,
>> > I didn't see it working this way.
>> > It is more like that Spark use the SortMerge join to shuffle both data
>> > across network, and filter on the "reducers" side by the join keys, to get
>> > the final output. But that is not the "broadcast" join supposed to do,
>> > correct?
>> > In the last stage, it will be very slow, until it reach and process all
>> > the history data, shown below as "shuffle read" reaching 720G, to finish.
>> >
>> >
>> >
>> >
>> > One thing I notice that if tungsten is enable, the shuffle write volume
>> > on stage 4 is larger (720G) than when tungsten is disable (506G) in my
>> > originally run, for the exactly same input. It is an interesting point, 
>> > does
>> > anyone have some idea about this?
>> >
>> >
>> > Overall, for my test case, "broadcast" join is the exactly most
>> > optimized way I should use; but somehow, I cannot make it do the same way 
>> > as
>> > "mapjoin" of Hive, even in Spark 1.6.1.
>> >
>> > As I said, this is a just test case. We have som

RE: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Yong Zhang
laced them with 
"400+ columns", without blow out the email, but I don't think this could do 
anything with "broadcast" problem.
Thanks
Yong

> Date: Wed, 23 Mar 2016 10:14:19 -0700
> Subject: Re: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> From: dav...@databricks.com
> To: java8...@hotmail.com
> CC: user@spark.apache.org
> 
> The broadcast hint does not work as expected in this case, could you
> also how the logical plan by 'explain(true)'?
> 
> On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8...@hotmail.com> wrote:
> >
> > So I am testing this code to understand "broadcast" feature of DF on Spark 
> > 1.6.1.
> > This time I am not disable "tungsten". Everything is default value, except 
> > setting memory and cores of my job on 1.6.1.
> >
> > I am testing the join2 case
> >
> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> > historyRaw("visid_high") &&  trialRaw("visid_low") === 
> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> >
> > and here is the DAG visualization in the runtime of my testing job:
> >
> >
> >
> >
> >
> > So now, I don't understand how the "broadcast" works on DateFrame in Spark. 
> > I originally thought it will be the same as "mapjoin" in the hive, but can 
> > someone explain the DAG above me?
> >
> > I have one day data about 1.5G compressed parquet file, filter by 
> > "instr(loadRaw("event_list"), "202") > 0", which will only output about 
> > 1494 rows (very small), and it is the "trailRaw" DF in my example.
> > Stage 3 has a filter, which I thought is for the trailRaw data, but the 
> > stage statics doesn't match with the data. I don't know why the input is 
> > only 78M, and shuffle write is about 97.6KB
> >
> >
> >
> >
> > The historyRaw will be about 90 days history data, which should be about 
> > 100G, so it looks like stage 4 is scanning it
> >
> >
> >
> >
> > Now, my original thought is that small data will be broadcasted to all the 
> > nodes, and most of history data will be filtered out by the join keys, at 
> > least that will be the "mapjoin" in Hive will do, but from the DAG above, I 
> > didn't see it working this way.
> > It is more like that Spark use the SortMerge join to shuffle both data 
> > across network, and filter on the "reducers" side by the join keys, to get 
> > the final output. But that is not the "broadcast" join supposed to do, 
> > correct?
> > In the last stage, it will be very slow, until it reach and process all the 
> > history data,  shown below as "shuffle read" reaching 720G, to finish.
> >
> >
> >
> >
> > One thing I notice that if tungsten is enable, the shuffle write volume on 
> > stage 4 is larger (720G) than when tungsten is disable (506G) in my 
> > originally run, for the exactly same input. It is an interesting point, 
> > does anyone have some idea about this?
> >
> >
> > Overall, for my test case, "broadcast" join is the exactly most optimized 
> > way I should use; but somehow, I cannot make it do the same way as 
> > "mapjoin" of Hive, even in Spark 1.6.1.
> >
> > As I said, this is a just test case. We have some business cases making 
> > sense to use "broadcast" join, but until I understand exactly how to make 
> > it work as I expect in Spark, I don't know what to do.
> >
> > Yong
> >
> > 
> > From: java8...@hotmail.com
> > To: user@spark.apache.org
> > Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in 
> > the last step
> > Date: Tue, 22 Mar 2016 13:08:31 -0400
> >
> >
> > Please help me understand how the "broadcast" will work on DF in Spark 
> > 1.5.2.
> >
> > Below are the 2 joins I tested and the physical plan I dumped:
> >
> > val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === 
> > historyRaw("visid_high") &&  trialRaw("visid_low") === 
> > historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> > val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> > historyRaw("visid_high") &&  trialRaw(&q

Re: Spark 1.5.2, why the broadcast join shuffle so much data in the last step

2016-03-23 Thread Davies Liu
The broadcast hint does not work as expected in this case, could you
also how the logical plan by 'explain(true)'?

On Wed, Mar 23, 2016 at 8:39 AM, Yong Zhang <java8...@hotmail.com> wrote:
>
> So I am testing this code to understand "broadcast" feature of DF on Spark 
> 1.6.1.
> This time I am not disable "tungsten". Everything is default value, except 
> setting memory and cores of my job on 1.6.1.
>
> I am testing the join2 case
>
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> and here is the DAG visualization in the runtime of my testing job:
>
>
>
>
>
> So now, I don't understand how the "broadcast" works on DateFrame in Spark. I 
> originally thought it will be the same as "mapjoin" in the hive, but can 
> someone explain the DAG above me?
>
> I have one day data about 1.5G compressed parquet file, filter by 
> "instr(loadRaw("event_list"), "202") > 0", which will only output about 1494 
> rows (very small), and it is the "trailRaw" DF in my example.
> Stage 3 has a filter, which I thought is for the trailRaw data, but the stage 
> statics doesn't match with the data. I don't know why the input is only 78M, 
> and shuffle write is about 97.6KB
>
>
>
>
> The historyRaw will be about 90 days history data, which should be about 
> 100G, so it looks like stage 4 is scanning it
>
>
>
>
> Now, my original thought is that small data will be broadcasted to all the 
> nodes, and most of history data will be filtered out by the join keys, at 
> least that will be the "mapjoin" in Hive will do, but from the DAG above, I 
> didn't see it working this way.
> It is more like that Spark use the SortMerge join to shuffle both data across 
> network, and filter on the "reducers" side by the join keys, to get the final 
> output. But that is not the "broadcast" join supposed to do, correct?
> In the last stage, it will be very slow, until it reach and process all the 
> history data,  shown below as "shuffle read" reaching 720G, to finish.
>
>
>
>
> One thing I notice that if tungsten is enable, the shuffle write volume on 
> stage 4 is larger (720G) than when tungsten is disable (506G) in my 
> originally run, for the exactly same input. It is an interesting point, does 
> anyone have some idea about this?
>
>
> Overall, for my test case, "broadcast" join is the exactly most optimized way 
> I should use; but somehow, I cannot make it do the same way as "mapjoin" of 
> Hive, even in Spark 1.6.1.
>
> As I said, this is a just test case. We have some business cases making sense 
> to use "broadcast" join, but until I understand exactly how to make it work 
> as I expect in Spark, I don't know what to do.
>
> Yong
>
> 
> From: java8...@hotmail.com
> To: user@spark.apache.org
> Subject: RE: Spark 1.5.2, why the broadcast join shuffle so much data in the 
> last step
> Date: Tue, 22 Mar 2016 13:08:31 -0400
>
>
> Please help me understand how the "broadcast" will work on DF in Spark 1.5.2.
>
> Below are the 2 joins I tested and the physical plan I dumped:
>
> val join1 = historyRaw.join(trialRaw, trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
> val join2 = historyRaw.join(broadcast(trialRaw), trialRaw("visid_high") === 
> historyRaw("visid_high") &&  trialRaw("visid_low") === 
> historyRaw("visid_low") && trialRaw("date_time") > historyRaw("date_time"))
>
> join1.explain(true)
> == Physical Plan ==
> Filter (date_time#25L > date_time#513L)
>  SortMergeJoin [visid_high#948L,visid_low#949L], 
> [visid_high#460L,visid_low#461L]
>   ExternalSort [visid_high#948L ASC,visid_low#949L ASC], false
>Exchange hashpartitioning(visid_high#948L,visid_low#949L)
> Scan ParquetRelation[hdfs://]
>   ExternalSort [visid_high#460L ASC,visid_low#461L ASC], false
>Exchange hashpartitioning(visid_high#460L,visid_low#461L)
> Project [soid_e1#30,visid_high#460L,visid_low#461L,date_time#25L]
>  Filter (instr(event_list#105,202) > 0)
>   Scan 
> ParquetRelation[hdfs://xxx/2016/03/17][visid_high#460L,