[ 
https://issues.apache.org/jira/browse/SPARK-11368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14997831#comment-14997831
 ] 

Jeff Zhang commented on SPARK-11368:
------------------------------------

Looks like an issue in QueryPlan optimization step, will work on this. 

> Spark shouldn't scan all partitions when using Python UDF and filter over 
> partitioned column is given
> -----------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-11368
>                 URL: https://issues.apache.org/jira/browse/SPARK-11368
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark, SQL
>            Reporter: Maciej BryƄski
>            Priority: Critical
>
> Hi,
> I think this is huge performance bug.
> I created parquet file partitioned by column.
> Then I make query with filter over partition column and filter with UDF.
> Result is that all partition are scanned.
> Sample data:
> {code}
> rdd = sc.parallelize(range(0,10000000)).map(lambda x: 
> Row(id=x%1000,value=x)).repartition(1)
> df = sqlCtx.createDataFrame(rdd)
> df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test')
> df = sqlCtx.read.parquet('/mnt/mfs/udf_test')
> df.registerTempTable('df')
> {code}
> Then queries:
> Without udf - Spark reads only 10 partitions:
> {code}
> start = time.time()
> sqlCtx.sql('select * from df where id >= 990 and value > 100000').count()
> print(time.time() - start)
> 0.9993703365325928
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#22L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#25L])
>    Project
>     Filter (value#5L > 100000)
>      Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L]
> {code}
> With udf Spark reads all the partitions:
> {code}
> sqlCtx.registerFunction('multiply2', lambda x: x *2 )
> start = time.time()
> sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > 
> 200000').count()
> print(time.time() - start)
> 13.0826096534729
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], 
> output=[count#34L])
>  TungstenExchange SinglePartition
>   TungstenAggregate(key=[], 
> functions=[(count(1),mode=Partial,isDistinct=false)], 
> output=[currentCount#37L])
>    TungstenProject
>     Filter ((id#6 > 990) && (cast(pythonUDF#33 as double) > 200000.0))
>      !BatchPythonEvaluation PythonUDF#multiply2(value#5L), 
> [value#5L,id#6,pythonUDF#33]
>       Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L,id#6]
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to