RE: Re:RE: how to merge dataframe write output files

2016-11-10 Thread Mendelson, Assaf
As people stated, when you coalesce to 1 partition then basically you lose all 
parallelism, however, you can coalesce to a difference value.
If for example you coalesce to 20 then you can parallelize up to 20 different 
tasks.
You have a total of 4 executors, with 2 cores each. This means that you 
basically have a core parallelism of 8. In general it is best to have a number 
of tasks which is 2-3 times that number for better distribution. So in general 
~20 tasks would be a good idea. Looking at your output I see part 00176 which I 
guess would mean you have an order of 200 tasks (which is the default 
parallelism when you have a shuffle for example).
Coalescing to 20 would still give you enough parallelism to use your cluster 
and would give you less files which are bigger.
Assaf.


From: Shreya Agarwal [mailto:shrey...@microsoft.com]
Sent: Thursday, November 10, 2016 10:28 AM
To: lk_spark
Cc: user.spark
Subject: RE: Re:RE: how to merge dataframe write output files

Your coalesce should technically work - One thing to check would be overhead 
memory. You should configure it as 10% of executor memory.  Also, you might 
need to increase maxResultSize. Also, the data looks fine for the cluster 
unless your join yields >6G worth of data. Few things to try -

  1.  Can you do a cache on both the DFs and try?
  2.  Can you do a cache on both, then use scala join on DFs instead of loading 
to sql ?
  3.  Can you try dataset instead of dataframe? (assuming you are on Spark 2.0)
  4.  If you still see errors, can you check YARN logs, or post here?

I am sorry I don't know the answer to this,  but pretty sure there should be a 
way to work with fragmented files too.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Thursday, November 10, 2016 12:20 AM
To: Shreya Agarwal <shrey...@microsoft.com<mailto:shrey...@microsoft.com>>
Cc: user.spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re:RE: how to merge dataframe write output files

thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file .
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?

the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }



At 2016-11-10 15:47:02, "Shreya Agarwal" 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>> wrote:
Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

Not to mention that it'll make your write incredibly slow and also it'll take 
away all the speed of reading in the data from a parquet as there won't be any 
parallelism at the time of input (if you try to input this parquet).

Again, the important question is - Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it'll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

From: lk_spark [mailto:lk_sp...@163.com<mailto:lk_sp...@163.com>]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: how to merge dataframe write output files

hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-

RE: Re:RE: how to merge dataframe write output files

2016-11-10 Thread Shreya Agarwal
Your coalesce should technically work - One thing to check would be overhead 
memory. You should configure it as 10% of executor memory.  Also, you might 
need to increase maxResultSize. Also, the data looks fine for the cluster 
unless your join yields >6G worth of data. Few things to try -

  1.  Can you do a cache on both the DFs and try?
  2.  Can you do a cache on both, then use scala join on DFs instead of loading 
to sql ?
  3.  Can you try dataset instead of dataframe? (assuming you are on Spark 2.0)
  4.  If you still see errors, can you check YARN logs, or post here?

I am sorry I don't know the answer to this,  but pretty sure there should be a 
way to work with fragmented files too.

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Thursday, November 10, 2016 12:20 AM
To: Shreya Agarwal <shrey...@microsoft.com>
Cc: user.spark <user@spark.apache.org>
Subject: Re:RE: how to merge dataframe write output files

thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file .
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?

the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }



At 2016-11-10 15:47:02, "Shreya Agarwal" 
<shrey...@microsoft.com<mailto:shrey...@microsoft.com>> wrote:

Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

Not to mention that it'll make your write incredibly slow and also it'll take 
away all the speed of reading in the data from a parquet as there won't be any 
parallelism at the time of input (if you try to input this parquet).

Again, the important question is - Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it'll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

From: lk_spark [mailto:lk_sp...@163.com<mailto:lk_sp...@163.com>]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: how to merge dataframe write output files

hi,all:
when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-1

Re:RE: how to merge dataframe write output files

2016-11-10 Thread lk_spark
thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file . 
for your question. yes I want to create ExternalTable on the parquetfile 
floder. And how to use fragmented files as you mention?


the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g 
--executor-memory 8g --executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag") #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")#almost 90M   381700  rows
for(i <- 1 to 61) {
  val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left 
join biztag on biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
  dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }





At 2016-11-10 15:47:02, "Shreya Agarwal"  wrote:


Is there a reason you want to merge the files? The reason you are getting 
errors (afaik) is because when you try to coalesce and then write, you are 
forcing all the content to reside on one executor, and the size of data is 
exceeding the memory you have for storage in your executor, hence causing the 
container to be killed. We can confirm this if you provide the specs of your 
cluster. The whole purpose of multiple files is so that each executor can write 
its partition out in parallel, without having to collect the data in one place.

 

Not to mention that it’ll make your write incredibly slow and also it’ll take 
away all the speed of reading in the data from a parquet as there won’t be any 
parallelism at the time of input (if you try to input this parquet).

 

Again, the important question is – Why do you need it to be one file? Are you 
planning to use it externally? If yes, can you not use fragmented files there? 
If the data is too big for the Spark executor, it’ll most certainly be too much 
for JRE or any other runtime  to load in memory on a single box.

 

From: lk_spark [mailto:lk_sp...@163.com]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark 
Subject: how to merge dataframe write output files

 

hi,all:

when I call api df.write.parquet ,there is alot of small files :   how can 
I merge then into on file ? I tried df.coalesce(1).write.parquet ,but it will 
get error some times 

Container exited with a non-zero exit code 143

more an more... 

-rw-r--r--   2 hadoop supergroup 14.5 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.2 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.4 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 16.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 17.1 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 14.0 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup 15.7 K 2016-11-10 15:11 
/parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc

more an more... 

2016-11-10

lk_spark