Your coalesce should technically work - One thing to check would be overhead memory. You should configure it as 10% of executor memory. Also, you might need to increase maxResultSize. Also, the data looks fine for the cluster unless your join yields >6G worth of data. Few things to try -
1. Can you do a cache on both the DFs and try? 2. Can you do a cache on both, then use scala join on DFs instead of loading to sql ? 3. Can you try dataset instead of dataframe? (assuming you are on Spark 2.0) 4. If you still see errors, can you check YARN logs, or post here? I am sorry I don't know the answer to this, but pretty sure there should be a way to work with fragmented files too. From: lk_spark [mailto:lk_sp...@163.com] Sent: Thursday, November 10, 2016 12:20 AM To: Shreya Agarwal <shrey...@microsoft.com> Cc: user.spark <user@spark.apache.org> Subject: Re:RE: how to merge dataframe write output files thank you for reply,Shreya: It's because the files is too small and hdfs dosen't like small file . for your question. yes I want to create ExternalTable on the parquetfile floder. And how to use fragmented files as you mention? the tests case as below: bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g --executor-memory 8g --executor-cores 2 --num-executors 4 val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/") df.createOrReplaceTempView("biztag") #almost 70M 3673411 rows val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608") df2.createOrReplaceTempView("biz1608") #almost 90M 381700 rows for(i <- 1 to 61) { val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}") dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}") } At 2016-11-10 15:47:02, "Shreya Agarwal" <shrey...@microsoft.com<mailto:shrey...@microsoft.com>> wrote: Is there a reason you want to merge the files? The reason you are getting errors (afaik) is because when you try to coalesce and then write, you are forcing all the content to reside on one executor, and the size of data is exceeding the memory you have for storage in your executor, hence causing the container to be killed. We can confirm this if you provide the specs of your cluster. The whole purpose of multiple files is so that each executor can write its partition out in parallel, without having to collect the data in one place. Not to mention that it'll make your write incredibly slow and also it'll take away all the speed of reading in the data from a parquet as there won't be any parallelism at the time of input (if you try to input this parquet). Again, the important question is - Why do you need it to be one file? Are you planning to use it externally? If yes, can you not use fragmented files there? If the data is too big for the Spark executor, it'll most certainly be too much for JRE or any other runtime to load in memory on a single box. From: lk_spark [mailto:lk_sp...@163.com<mailto:lk_sp...@163.com>] Sent: Wednesday, November 9, 2016 11:29 PM To: user.spark <user@spark.apache.org<mailto:user@spark.apache.org>> Subject: how to merge dataframe write output files hi,all: when I call api df.write.parquet ,there is alot of small files : how can I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will get error some times Container exited with a non-zero exit code 143 more an more... -rw-r--r-- 2 hadoop supergroup 14.5 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 16.4 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.2 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.4 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 16.0 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 14.0 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet -rw-r--r-- 2 hadoop supergroup 15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc more an more... 2016-11-10 ________________________________ lk_spark