[
https://issues.apache.org/jira/browse/SPARK-11368?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Davies Liu resolved SPARK-11368.
--------------------------------
Resolution: Fixed
Assignee: Davies Liu
Fix Version/s: 2.0.0
This was fixed by https://issues.apache.org/jira/browse/SPARK-14581
> Spark shouldn't scan all partitions when using Python UDF and filter over
> partitioned column is given
> -----------------------------------------------------------------------------------------------------
>
> Key: SPARK-11368
> URL: https://issues.apache.org/jira/browse/SPARK-11368
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Reporter: Maciej BryĆski
> Assignee: Davies Liu
> Priority: Critical
> Fix For: 2.0.0
>
>
> Hi,
> I think this is huge performance bug.
> I created parquet file partitioned by column.
> Then I make query with filter over partition column and filter with UDF.
> Result is that all partition are scanned.
> Sample data:
> {code}
> rdd = sc.parallelize(range(0,10000000)).map(lambda x:
> Row(id=x%1000,value=x)).repartition(1)
> df = sqlCtx.createDataFrame(rdd)
> df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test')
> df = sqlCtx.read.parquet('/mnt/mfs/udf_test')
> df.registerTempTable('df')
> {code}
> Then queries:
> Without udf - Spark reads only 10 partitions:
> {code}
> start = time.time()
> sqlCtx.sql('select * from df where id >= 990 and value > 100000').count()
> print(time.time() - start)
> 0.9993703365325928
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[count#22L])
> TungstenExchange SinglePartition
> TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[currentCount#25L])
> Project
> Filter (value#5L > 100000)
> Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L]
> {code}
> With udf Spark reads all the partitions:
> {code}
> sqlCtx.registerFunction('multiply2', lambda x: x *2 )
> start = time.time()
> sqlCtx.sql('select * from df where id >= 990 and multiply2(value) >
> 200000').count()
> print(time.time() - start)
> 13.0826096534729
> == Physical Plan ==
> TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)],
> output=[count#34L])
> TungstenExchange SinglePartition
> TungstenAggregate(key=[],
> functions=[(count(1),mode=Partial,isDistinct=false)],
> output=[currentCount#37L])
> TungstenProject
> Filter ((id#6 > 990) && (cast(pythonUDF#33 as double) > 200000.0))
> !BatchPythonEvaluation PythonUDF#multiply2(value#5L),
> [value#5L,id#6,pythonUDF#33]
> Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L,id#6]
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]