Nasir Ali created SPARK-30162:
---------------------------------

             Summary: Filters is not being pushed down for Parquet files
                 Key: SPARK-30162
                 URL: https://issues.apache.org/jira/browse/SPARK-30162
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 3.0.0
         Environment: pyspark 3.0 preview

Ubuntu/Centos

pyarrow 0.14.1 
            Reporter: Nasir Ali


Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
                            ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
                            ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
                            ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
                            ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
                            ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
                            ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-16T11:27:18+00:00")
                            ],
                           ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
ReadSchema: struct<id:double,ts:timestamp>{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan 
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
      +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
struct<id:double,ts:timestamp>
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and 
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows 
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard to 
debug.  spark.sql.orc.cache.stripe.details.size=10000 doesn't work.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to