Maciej Bryński created SPARK-11368:
--------------------------------------
Summary: Spark scan all partitions when using Python UDF and
filter over partitioned column is given
Key: SPARK-11368
URL: https://issues.apache.org/jira/browse/SPARK-11368
Project: Spark
Issue Type: Bug
Components: PySpark, SQL
Reporter: Maciej Bryński
Priority: Critical
Hi,
I think this is huge performance bug.
I created parquet file partitioned by column.
Then I make query with filter over partition column and filter with UDF.
Result is that all partition is scanned.
Sample data:
{code}
rdd = sc.parallelize(range(0,10000000)).map(lambda x:
Row(id=x%1000,value=x)).repartition(1)
df = sqlCtx.createDataFrame(rdd)
df.write.mode("overwrite").partitionBy('id').parquet('/mnt/mfs/udf_test')
df = sqlCtx.read.parquet('/mnt/mfs/udf_test')
df.registerTempTable('df')
{code}
Then queries:
Without udf - Spark reads only 10 partitions:
{code}
start = time.time()
sqlCtx.sql('select * from df where id >= 990 and value > 100000').count()
print(time.time() - start)
0.9993703365325928
{code}
With udf Spark reads all the partitions:
{code}
sqlCtx.registerFunction('multiply2', lambda x: x *2 )
start = time.time()
sqlCtx.sql('select * from df where id >= 990 and multiply2(value) >
200000').count()
print(time.time() - start)
13.0826096534729
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]