[ 
https://issues.apache.org/jira/browse/SPARK-13570?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15305896#comment-15305896
 ] 

Sean Owen commented on SPARK-13570:
-----------------------------------

It's hard to say whether this is a problem without knowing more about your 
environment. This could be normal if you have a huge amount of data and run on 
one machine and are I/O bound. You would need to be more specific about why 
this is slow relative to what. For example it's not clear what is taking 6.5 
mins. You should also see if you can pinpoint what operation (not just stage) 
is taking a while. This also doesn't specify what version of Spark, etc.

> pyspark save with partitionBy is very slow
> ------------------------------------------
>
>                 Key: SPARK-13570
>                 URL: https://issues.apache.org/jira/browse/SPARK-13570
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>            Reporter: Shubhanshu Mishra
>              Labels: dataframe, partitioning, pyspark, save
>
> Running the following code to store data from each year and pos in a seperate 
> folder for a very large dataframe is taking a huge amount of time. (>37 hours 
> for 60% of the work)
> {code}
> ## IPYTHON was started using the following command: 
> # IPYTHON=1 "$SPARK_HOME/bin/pyspark" --driver-memory 50g 
> from pyspark import SparkContext, SparkConf
> from pyspark.sql import SQLContext, Row
> from pyspark.sql.types import *
> conf = SparkConf()
> conf.setMaster("local[30]")
> conf.setAppName("analysis")
> conf.set("spark.local.dir", "./tmp")
> conf.set("spark.executor.memory", "50g")
> conf.set("spark.driver.maxResultSize", "5g")
> sc = SparkContext(conf=conf)
> sqlContext = SQLContext(sc)
> df = sqlContext.read.format("csv").options(header=False, inferschema=True, 
> delimiter="\t").load("out/new_features")
> df = df.selectExpr(*("%s as %s" % (df.columns[i], k) for i,k in 
> enumerate(columns)))
> # year can take values from [1902,2015]
> # pos takes integer values from [-1,0,1,2]
> # df is a dataframe with 20 columns and 1 billion rows
> # Running on  Machine with 32 cores and 500 GB RAM
> df.write.save("out/model_input_partitioned", format="csv", 
> partitionBy=["year", "pos"], delimiter="\t")
> {code}
> Currently, the code is at: 
> [Stage 12:==============================>                    (1367 + 30) / 
> 2290]
> And it has already been more than 37 hours. A single sweep on this data for 
> filter by value takes less than 6.5 minutes. 
> The spark web interface shows the following lines for the 2 stages of the job:
> Stage Description     Submitted       Duration        Tasks:succeeded/total   
> Input   Output  Shuffle Read    Shuffle Write
> 11    load at NativeMethodAccessorImpl.java:-2 +details 2016/02/27 23:07:04   
> 6.5 min 2290/2290       66.8 GB
> 12    save at NativeMethodAccessorImpl.java:-2 +details 2016/02/27 23:15:59   
> 37.1 h  1370/2290       40.9 GB



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to