Maciej Bryński created SPARK-11368:
--------------------------------------

             Summary: Spark scan all partitions when using Python UDF and 
filter over partitioned column is given
                 Key: SPARK-11368
                 URL: https://issues.apache.org/jira/browse/SPARK-11368
             Project: Spark
          Issue Type: Bug
          Components: PySpark, SQL
            Reporter: Maciej Bryński
            Priority: Critical


Hi,
I think this is huge performance bug.

I created parquet file partitioned by column.
Then I make query with filter over partition column and filter with UDF.
Result is that all partition is scanned.

Sample data:
{code}
rdd = sc.parallelize(range(0,10000000)).map(lambda x: 
Row(id=x%1000,value=x)).repartition(1)
df = sqlCtx.createDataFrame(rdd)
df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test')
df = sqlCtx.read.parquet('/mnt/mfs/udf_test')
df.registerTempTable('df')
{code}
Then queries:
Without udf - Spark reads only 10 partitions:
{code}
start = time.time()
sqlCtx.sql('select * from df where id >= 990 and value > 100000').count()
print(time.time() - start)

0.9993703365325928
{code}
With udf Spark reads all the partitions:
{code}
sqlCtx.registerFunction('multiply2', lambda x: x *2 )
start = time.time()
sqlCtx.sql('select * from df where id >= 990 and multiply2(value) > 
200000').count()
print(time.time() - start)

13.0826096534729
{code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to