The problem is that it shuffles the wrong table which even compressed won’t fit 
to my disk.

Actually, I found the source of the problem, although I could not reproduce it 
with synthetic data (but remains true for my original data: big table 2B rows, 
small 500K):

When I do join on two fields like this “select big.f1, big.f2 from small inner 
join big on big.s=small.s and big.d=small.d” then the query planner does 
ShuffledHashJoin with the bigger table and the query fails due to Shuffle write 
of the whole big table (out of space):
== Physical Plan ==
Project [bedId#28,sensorId#30L,time#31,label#32,fft#34]
ShuffledHashJoin [time#38,bedId#35], [time#31,bedId#28], BuildLeft
  Exchange (HashPartitioning [time#38,bedId#35], 2000), []
    PhysicalRDD [bedId#35,time#38], MapPartitionsRDD[75] at explain at 
<console>:25
  Exchange (HashPartitioning [time#31,bedId#28], 2000), []
   PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31], 
MapPartitionsRDD[77] at explain at <console>:25

When I do join on one field like this “select big.f1, big.f2 from small inner 
join big on big.s=small.s” then the query planner does BroadcastHashJoin which 
writes just what’s needed and therefore executes without problems:

== Physical Plan ==
Project [bedId#28,sensorId#30L,time#31,label#32,fft#34]
BroadcastHashJoin [time#38], [time#31], BuildLeft
  Limit 498340
   PhysicalRDD [time#38], MapPartitionsRDD[66] at explain at <console>:25
  PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31], 
MapPartitionsRDD[68] at explain at <console>:25

Could Spark SQL developers suggest why it happens?

Best regards, Alexander


From: Stephen Carman [mailto:scar...@coldlight.com]
Sent: Wednesday, June 24, 2015 12:33 PM
To: Ulanov, Alexander
Cc: CC GP; dev@spark.apache.org
Subject: Re: Force inner join to shuffle the smallest table

Have you tried shuffle compression?

spark.shuffle.compress (true|false)

if you have a filesystem capable also I’ve noticed file consolidation helps 
disk usage a bit.

spark.shuffle.consolidateFiles (true|false)

Steve

On Jun 24, 2015, at 3:27 PM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:

It also fails, as I mentioned in the original question.

From: CC GP [mailto:chandrika.gopalakris...@gmail.com]
Sent: Wednesday, June 24, 2015 12:08 PM
To: Ulanov, Alexander
Cc: dev@spark.apache.org<mailto:dev@spark.apache.org>
Subject: Re: Force inner join to shuffle the smallest table

Try below and see if it makes a difference:

val result = sqlContext.sql(“select big.f1, big.f2 from small inner join big on 
big.s=small.s and big.d=small.d”)

On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander 
<alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote:
Hi,

I try to inner join of two tables on two fields(string and double). One table 
is 2B rows, the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4.
val big = sqlContext.paquetFile(“hdfs://big”)
data.registerTempTable(“big”)
val small = sqlContext.paquetFile(“hdfs://small”)
data.registerTempTable(“small”)
val result = sqlContext.sql(“select big.f1, big.f2 from big inner join small on 
big.s=small.s and big.d=small.d”)

This query fails in the middle due to one of the workers “disk out of space” 
with shuffle reported 1.8TB which is the maximum size of my spark working dirs 
(on total 7 worker nodes). This is surprising, because the “big” table takes 
2TB disk space (unreplicated) and “small” about 5GB and I would expect that 
optimizer will shuffle the small table. How to force Spark to shuffle the small 
table? I tried to write “small inner join big” however it also fails with 1.8TB 
of shuffle.

Best regards, Alexander

This e-mail is intended solely for the above-mentioned recipient and it may 
contain confidential or privileged information. If you have received it in 
error, please notify us immediately and delete the e-mail. You must not copy, 
distribute, disclose or take any action in reliance on it. In addition, the 
contents of an attachment to this e-mail may contain software viruses which 
could damage your own computer system. While ColdLight Solutions, LLC has taken 
every reasonable precaution to minimize this risk, we cannot accept liability 
for any damage which you sustain as a result of software viruses. You should 
perform your own virus checks before opening the attachment.

Reply via email to