[ 
https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nasir Ali updated SPARK-30162:
------------------------------
    Description: 
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
                            ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
                            ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
                            ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
                            ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
                            ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
                            ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-16T11:27:18+00:00")
                            ],
                           ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
ReadSchema: struct<id:double,ts:timestamp>{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan 
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
      +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
struct<id:double,ts:timestamp>
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and 
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows 
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
it's hard to debug.  spark.sql.orc.cache.stripe.details.size=10000 doesn't work.

 
{code:java}
// pyspark 3 shell output
$ pyspark
Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
Warning: Ignoring non-spark config property: 
java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8
19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be overridden 
by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview
      /_/Using Python version 3.6.8 (default, Aug  7 2019 17:28:10)
SparkSession available as 'spark'.
{code}
{code:java}
// pyspark 2.4.4 shell output
pyspark
Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
[GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
Type "help", "copyright", "credits" or "license" for more information.
2019-12-09 07:09:07 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
setLogLevel(newLevel).
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
      /_/Using Python version 3.6.8 (default, Aug  7 2019 17:28:10)
SparkSession available as 'spark'.

{code}
 

 

  was:
Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" 
method is different. It is hard to debug in 3.0 whether filters were pushed 
down or not. Below code could reproduce the bug:

 
{code:java}
// code placeholder
df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
                            ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
                            ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
                            ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
                            ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
                            ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
                            ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
                            ("usr2",25.00, "2018-03-16T11:27:18+00:00")
                            ],
                           ["user","id", "ts"])
df = df.withColumn('ts', df.ts.cast('timestamp'))
df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
{code}
{code:java}
// Spark 2.4 output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#40 = usr2)
+- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
Filter (isnotnull(user#40) && (user#40 = usr2))
+- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
*(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
ReadSchema: struct<id:double,ts:timestamp>{code}
{code:java}
// Spark 3.0.0-preview output
== Parsed Logical Plan ==
'Filter ('user = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
Logical Plan ==
id: double, ts: timestamp, user: string
Filter (user#2 = usr2)
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
Logical Plan ==
Filter (isnotnull(user#2) AND (user#2 = usr2))
+- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan 
==
*(1) Project [id#0, ts#1, user#2]
+- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
   +- *(1) ColumnarToRow
      +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
struct<id:double,ts:timestamp>
{code}
I have tested it on much larger dataset. Spark 3.0 tries to load whole data and 
then apply filter. Whereas Spark 2.4 push down the filter. Above output shows 
that Spark 2.4 applied partition filter but not the Spark 3.0 preview.

 

Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
it's hard to debug.  spark.sql.orc.cache.stripe.details.size=10000 doesn't work.

 


> Filter is not being pushed down for Parquet files
> -------------------------------------------------
>
>                 Key: SPARK-30162
>                 URL: https://issues.apache.org/jira/browse/SPARK-30162
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.0.0
>         Environment: pyspark 3.0 preview
> Ubuntu/Centos
> pyarrow 0.14.1 
>            Reporter: Nasir Ali
>            Priority: Major
>
> Filters are not pushed down in Spark 3.0 preview. Also the output of 
> "explain" method is different. It is hard to debug in 3.0 whether filters 
> were pushed down or not. Below code could reproduce the bug:
>  
> {code:java}
> // code placeholder
> df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"),
>                             ("usr1",13.00, "2018-03-11T12:27:18+00:00"),
>                             ("usr1",25.00, "2018-03-12T11:27:18+00:00"),
>                             ("usr1",20.00, "2018-03-13T15:27:18+00:00"),
>                             ("usr1",17.00, "2018-03-14T12:27:18+00:00"),
>                             ("usr2",99.00, "2018-03-15T11:27:18+00:00"),
>                             ("usr2",156.00, "2018-03-22T11:27:18+00:00"),
>                             ("usr2",17.00, "2018-03-31T11:27:18+00:00"),
>                             ("usr2",25.00, "2018-03-15T11:27:18+00:00"),
>                             ("usr2",25.00, "2018-03-16T11:27:18+00:00")
>                             ],
>                            ["user","id", "ts"])
> df = df.withColumn('ts', df.ts.cast('timestamp'))
> df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = 
> spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True)
> {code}
> {code:java}
> // Spark 2.4 output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#40 = usr2)
> +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan ==
> Filter (isnotnull(user#40) && (user#40 = usr2))
> +- Relation[id#38,ts#39,user#40] parquet== Physical Plan ==
> *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, 
> Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, 
> PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], 
> ReadSchema: struct<id:double,ts:timestamp>{code}
> {code:java}
> // Spark 3.0.0-preview output
> == Parsed Logical Plan ==
> 'Filter ('user = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed 
> Logical Plan ==
> id: double, ts: timestamp, user: string
> Filter (user#2 = usr2)
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized 
> Logical Plan ==
> Filter (isnotnull(user#2) AND (user#2 = usr2))
> +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical 
> Plan ==
> *(1) Project [id#0, ts#1, user#2]
> +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2))
>    +- *(1) ColumnarToRow
>       +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: 
> InMemoryFileIndex[file:/home/cnali/data], ReadSchema: 
> struct<id:double,ts:timestamp>
> {code}
> I have tested it on much larger dataset. Spark 3.0 tries to load whole data 
> and then apply filter. Whereas Spark 2.4 push down the filter. Above output 
> shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview.
>  
> Minor: in Spark 3.0 "explain()" output is truncated (maybe fixed length?) and 
> it's hard to debug.  spark.sql.orc.cache.stripe.details.size=10000 doesn't 
> work.
>  
> {code:java}
> // pyspark 3 shell output
> $ pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> Warning: Ignoring non-spark config property: 
> java.io.dir=/md2k/data1,/md2k/data2,/md2k/data3,/md2k/data4,/md2k/data5,/md2k/data6,/md2k/data7,/md2k/data8
> 19/12/09 07:05:36 WARN NativeCodeLoader: Unable to load native-hadoop library 
> for your platform... using builtin-java classes where applicable
> Using Spark's default log4j profile: 
> org/apache/spark/log4j-defaults.properties
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> 19/12/09 07:05:36 WARN SparkConf: Note that spark.local.dir will be 
> overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in 
> mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 3.0.0-preview
>       /_/Using Python version 3.6.8 (default, Aug  7 2019 17:28:10)
> SparkSession available as 'spark'.
> {code}
> {code:java}
> // pyspark 2.4.4 shell output
> pyspark
> Python 3.6.8 (default, Aug  7 2019, 17:28:10) 
> [GCC 4.8.5 20150623 (Red Hat 4.8.5-39)] on linux
> Type "help", "copyright", "credits" or "license" for more information.
> 2019-12-09 07:09:07 WARN  NativeCodeLoader:62 - Unable to load native-hadoop 
> library for your platform... using builtin-java classes where applicable
> Setting default log level to "WARN".
> To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use 
> setLogLevel(newLevel).
> Welcome to
>       ____              __
>      / __/__  ___ _____/ /__
>     _\ \/ _ \/ _ `/ __/  '_/
>    /__ / .__/\_,_/_/ /_/\_\   version 2.4.0
>       /_/Using Python version 3.6.8 (default, Aug  7 2019 17:28:10)
> SparkSession available as 'spark'.
> {code}
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to