The problem is that it shuffles the wrong table which even compressed won’t fit
to my disk.
Actually, I found the source of the problem, although I could not reproduce it
with synthetic data (but remains true for my original data: big table 2B rows,
small 500K):
When I do join on two fields like this “select big.f1, big.f2 from small inner
join big on big.s=small.s and big.d=small.d” then the query planner does
ShuffledHashJoin with the bigger table and the query fails due to Shuffle write
of the whole big table (out of space):
== Physical Plan ==
Project [bedId#28,sensorId#30L,time#31,label#32,fft#34]
ShuffledHashJoin [time#38,bedId#35], [time#31,bedId#28], BuildLeft
Exchange (HashPartitioning [time#38,bedId#35], 2000), []
PhysicalRDD [bedId#35,time#38], MapPartitionsRDD[75] at explain at
<console>:25
Exchange (HashPartitioning [time#31,bedId#28], 2000), []
PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31],
MapPartitionsRDD[77] at explain at <console>:25
When I do join on one field like this “select big.f1, big.f2 from small inner
join big on big.s=small.s” then the query planner does BroadcastHashJoin which
writes just what’s needed and therefore executes without problems:
== Physical Plan ==
Project [bedId#28,sensorId#30L,time#31,label#32,fft#34]
BroadcastHashJoin [time#38], [time#31], BuildLeft
Limit 498340
PhysicalRDD [time#38], MapPartitionsRDD[66] at explain at <console>:25
PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31],
MapPartitionsRDD[68] at explain at <console>:25
Could Spark SQL developers suggest why it happens?
Best regards, Alexander
From: Stephen Carman [mailto:[email protected]]
Sent: Wednesday, June 24, 2015 12:33 PM
To: Ulanov, Alexander
Cc: CC GP; [email protected]
Subject: Re: Force inner join to shuffle the smallest table
Have you tried shuffle compression?
spark.shuffle.compress (true|false)
if you have a filesystem capable also I’ve noticed file consolidation helps
disk usage a bit.
spark.shuffle.consolidateFiles (true|false)
Steve
On Jun 24, 2015, at 3:27 PM, Ulanov, Alexander
<[email protected]<mailto:[email protected]>> wrote:
It also fails, as I mentioned in the original question.
From: CC GP [mailto:[email protected]]
Sent: Wednesday, June 24, 2015 12:08 PM
To: Ulanov, Alexander
Cc: [email protected]<mailto:[email protected]>
Subject: Re: Force inner join to shuffle the smallest table
Try below and see if it makes a difference:
val result = sqlContext.sql(“select big.f1, big.f2 from small inner join big on
big.s=small.s and big.d=small.d”)
On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander
<[email protected]<mailto:[email protected]>> wrote:
Hi,
I try to inner join of two tables on two fields(string and double). One table
is 2B rows, the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4.
val big = sqlContext.paquetFile(“hdfs://big”)
data.registerTempTable(“big”)
val small = sqlContext.paquetFile(“hdfs://small”)
data.registerTempTable(“small”)
val result = sqlContext.sql(“select big.f1, big.f2 from big inner join small on
big.s=small.s and big.d=small.d”)
This query fails in the middle due to one of the workers “disk out of space”
with shuffle reported 1.8TB which is the maximum size of my spark working dirs
(on total 7 worker nodes). This is surprising, because the “big” table takes
2TB disk space (unreplicated) and “small” about 5GB and I would expect that
optimizer will shuffle the small table. How to force Spark to shuffle the small
table? I tried to write “small inner join big” however it also fails with 1.8TB
of shuffle.
Best regards, Alexander
This e-mail is intended solely for the above-mentioned recipient and it may
contain confidential or privileged information. If you have received it in
error, please notify us immediately and delete the e-mail. You must not copy,
distribute, disclose or take any action in reliance on it. In addition, the
contents of an attachment to this e-mail may contain software viruses which
could damage your own computer system. While ColdLight Solutions, LLC has taken
every reasonable precaution to minimize this risk, we cannot accept liability
for any damage which you sustain as a result of software viruses. You should
perform your own virus checks before opening the attachment.