[ https://issues.apache.org/jira/browse/SPARK-11368?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14997831#comment-14997831 ]
Jeff Zhang commented on SPARK-11368: ------------------------------------ Looks like an issue in QueryPlan optimization step, will work on this. > Spark shouldn't scan all partitions when using Python UDF and filter over > partitioned column is given > ----------------------------------------------------------------------------------------------------- > > Key: SPARK-11368 > URL: https://issues.apache.org/jira/browse/SPARK-11368 > Project: Spark > Issue Type: Bug > Components: PySpark, SQL > Reporter: Maciej BryĆski > Priority: Critical > > Hi, > I think this is huge performance bug. > I created parquet file partitioned by column. > Then I make query with filter over partition column and filter with UDF. > Result is that all partition are scanned. > Sample data: > {code} > rdd = sc.parallelize(range(0,10000000)).map(lambda x: > Row(id=x%1000,value=x)).repartition(1) > df = sqlCtx.createDataFrame(rdd) > df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test') > df = sqlCtx.read.parquet('/mnt/mfs/udf_test') > df.registerTempTable('df') > {code} > Then queries: > Without udf - Spark reads only 10 partitions: > {code} > start = time.time() > sqlCtx.sql('select * from df where id >= 990 and value > 100000').count() > print(time.time() - start) > 0.9993703365325928 > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#22L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#25L]) > Project > Filter (value#5L > 100000) > Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L] > {code} > With udf Spark reads all the partitions: > {code} > sqlCtx.registerFunction('multiply2', lambda x: x *2 ) > start = time.time() > sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > > 200000').count() > print(time.time() - start) > 13.0826096534729 > == Physical Plan == > TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], > output=[count#34L]) > TungstenExchange SinglePartition > TungstenAggregate(key=[], > functions=[(count(1),mode=Partial,isDistinct=false)], > output=[currentCount#37L]) > TungstenProject > Filter ((id#6 > 990) && (cast(pythonUDF#33 as double) > 200000.0)) > !BatchPythonEvaluation PythonUDF#multiply2(value#5L), > [value#5L,id#6,pythonUDF#33] > Scan ParquetRelation[file:/mnt/mfs/udf_test][value#5L,id#6] > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org