[ 
https://issues.apache.org/jira/browse/SPARK-4315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14227427#comment-14227427
 ] 

Adam Davison commented on SPARK-4315:
-------------------------------------

Sure, will try to say what I can. Unfortunately I don't think I can easily give 
you a sample of the data. If we can't figure it out I can try to produce a fake 
sample that still exhibits the problem. But first let me try to come up with a 
few possibly salient differences:

1. My data is very wide, about 80 columns.
2. This size of the resulting groups in the groupBy is very ragged, whereas 
yours here is very even. Probably exponentially distributed in my case or more 
extreme. I wonder if this is generating many different Row "types" somehow.
3. My Row objects are constructed via the parquet functions of SQLContext

In my debugging I noticed that the cache size was reaching hundreds of entries 
or more from printing the number of items in the dict.

I'll also include part of the code I was using:

conf = pyspark.SparkConf().setMaster("local[24]").setAppName("test")
sc = pyspark.SparkContext(conf = conf)
sqlc = pyspark.sql.SQLContext(sc)
data = sqlc.parquetFile("/home/adam/parquet_test")

def getnow():
    return int(round(time.time() * 1000))

def applyfunc2(data):
    <some work which returns a list object>

print "CHECKPOINT 1: %i" % (getnow())
data.cache()
junk = data.map(lambda x: 0).collect() # This part introduced to separate the 
timing of disk load and computation
print "CHECKPOINT 2: %i" % (getnow())
grouped = data.groupBy(lambda x: x.unique_user_identifier)
print "CHECKPOINT 3: %i" % (getnow())
calced = grouped.flatMap(applyfunc2)
print "CHECKPOINT 4: %i" % (getnow())
counts = calced.collect()
print "CHECKPOINT 5: %i" % (getnow())

> PySpark pickling of pyspark.sql.Row objects is extremely inefficient
> --------------------------------------------------------------------
>
>                 Key: SPARK-4315
>                 URL: https://issues.apache.org/jira/browse/SPARK-4315
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.1.0
>         Environment: Ubuntu, Python 2.7, Spark 1.1.0
>            Reporter: Adam Davison
>
> Working with an RDD of pyspark.sql.Row objects, created by reading a file 
> with SQLContext in a local PySpark context.
> Operations on the RDD, such as: data.groupBy(lambda x: x.field_name) are 
> extremely slow (more than 10x slower than an equivalent Scala/Spark 
> implementation). Obviously I expected it to be somewhat slower, but I did a 
> bit of digging given the difference was so huge.
> Luckily it's fairly easy to add profiling to the Python workers. I see that 
> the vast majority of time is spent in:
> spark-1.1.0-bin-cdh4/python/pyspark/sql.py:757(_restore_object)
> It seems that this line attempts to accelerate pickling of Rows with the use 
> of a cache. Some debugging reveals that this cache becomes quite big (100s of 
> entries). Disabling the cache by adding:
> return _create_cls(dataType)(obj)
> as the first line of _restore_object made my query run 5x faster. Implying 
> that the caching is not providing the desired acceleration...



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to