The problem is that it shuffles the wrong table which even compressed won’t fit to my disk.
Actually, I found the source of the problem, although I could not reproduce it with synthetic data (but remains true for my original data: big table 2B rows, small 500K): When I do join on two fields like this “select big.f1, big.f2 from small inner join big on big.s=small.s and big.d=small.d” then the query planner does ShuffledHashJoin with the bigger table and the query fails due to Shuffle write of the whole big table (out of space): == Physical Plan == Project [bedId#28,sensorId#30L,time#31,label#32,fft#34] ShuffledHashJoin [time#38,bedId#35], [time#31,bedId#28], BuildLeft Exchange (HashPartitioning [time#38,bedId#35], 2000), [] PhysicalRDD [bedId#35,time#38], MapPartitionsRDD[75] at explain at <console>:25 Exchange (HashPartitioning [time#31,bedId#28], 2000), [] PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31], MapPartitionsRDD[77] at explain at <console>:25 When I do join on one field like this “select big.f1, big.f2 from small inner join big on big.s=small.s” then the query planner does BroadcastHashJoin which writes just what’s needed and therefore executes without problems: == Physical Plan == Project [bedId#28,sensorId#30L,time#31,label#32,fft#34] BroadcastHashJoin [time#38], [time#31], BuildLeft Limit 498340 PhysicalRDD [time#38], MapPartitionsRDD[66] at explain at <console>:25 PhysicalRDD [bedId#28,sensorId#30L,fft#34,label#32,time#31], MapPartitionsRDD[68] at explain at <console>:25 Could Spark SQL developers suggest why it happens? Best regards, Alexander From: Stephen Carman [mailto:scar...@coldlight.com] Sent: Wednesday, June 24, 2015 12:33 PM To: Ulanov, Alexander Cc: CC GP; dev@spark.apache.org Subject: Re: Force inner join to shuffle the smallest table Have you tried shuffle compression? spark.shuffle.compress (true|false) if you have a filesystem capable also I’ve noticed file consolidation helps disk usage a bit. spark.shuffle.consolidateFiles (true|false) Steve On Jun 24, 2015, at 3:27 PM, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: It also fails, as I mentioned in the original question. From: CC GP [mailto:chandrika.gopalakris...@gmail.com] Sent: Wednesday, June 24, 2015 12:08 PM To: Ulanov, Alexander Cc: dev@spark.apache.org<mailto:dev@spark.apache.org> Subject: Re: Force inner join to shuffle the smallest table Try below and see if it makes a difference: val result = sqlContext.sql(“select big.f1, big.f2 from small inner join big on big.s=small.s and big.d=small.d”) On Wed, Jun 24, 2015 at 11:35 AM, Ulanov, Alexander <alexander.ula...@hp.com<mailto:alexander.ula...@hp.com>> wrote: Hi, I try to inner join of two tables on two fields(string and double). One table is 2B rows, the second is 500K. They are stored in HDFS in Parquet. Spark v 1.4. val big = sqlContext.paquetFile(“hdfs://big”) data.registerTempTable(“big”) val small = sqlContext.paquetFile(“hdfs://small”) data.registerTempTable(“small”) val result = sqlContext.sql(“select big.f1, big.f2 from big inner join small on big.s=small.s and big.d=small.d”) This query fails in the middle due to one of the workers “disk out of space” with shuffle reported 1.8TB which is the maximum size of my spark working dirs (on total 7 worker nodes). This is surprising, because the “big” table takes 2TB disk space (unreplicated) and “small” about 5GB and I would expect that optimizer will shuffle the small table. How to force Spark to shuffle the small table? I tried to write “small inner join big” however it also fails with 1.8TB of shuffle. Best regards, Alexander This e-mail is intended solely for the above-mentioned recipient and it may contain confidential or privileged information. If you have received it in error, please notify us immediately and delete the e-mail. You must not copy, distribute, disclose or take any action in reliance on it. In addition, the contents of an attachment to this e-mail may contain software viruses which could damage your own computer system. While ColdLight Solutions, LLC has taken every reasonable precaution to minimize this risk, we cannot accept liability for any damage which you sustain as a result of software viruses. You should perform your own virus checks before opening the attachment.