Nasir Ali created SPARK-30162:
---------------------------------
Summary: Filters is not being pushed down for Parquet files
Key: SPARK-30162
URL: https://issues.apache.org/jira/browse/SPARK-30162
Project: Spark
Issue Type: Bug
Components: SQL
Affects Versions: 3.0.0
Environment: pyspark 3.0 preview
Ubuntu/Centos
pyarrow 0.14.1
Reporter: Nasir Ali
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain"
method is different. It is hard to debug in 3.0 whether filters were pushed
down or not. Below code could reproduce the bug:
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
("usr1",13.00, "2018-03-11T12:27:18+00:00"),
("usr1",25.00, "2018-03-12T11:27:18+00:00"),
("usr1",20.00, "2018-03-13T15:27:18+00:00"),
("usr1",17.00, "2018-03-14T12:27:18+00:00"),
("usr2",99.00, "2018-03-15T11:27:18+00:00"),
("usr2",156.00, "2018-03-22T11:27:18+00:00"),
("usr2",17.00, "2018-03-31T11:27:18+00:00"),
("usr2",25.00, "2018-03-15T11:27:18+00:00"),
("usr2",25.00, "2018-03-16T11:27:18+00:00")
],
["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 =
spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet,
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1,
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [],
ReadSchema: struct<id:double,ts:timestamp>{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
+- *(1) ColumnarToRow
+- BatchScan[id#0, ts#1, user#2] ParquetScan Location:
InMemoryFileIndex[file:/home/cnali/data], ReadSchema:
struct<id:double,ts:timestamp>
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard to
debug. spark.sql.orc.cache.stripe.details.size=10000 doesn't work.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]