[ https://issues.apache.org/jira/browse/SPARK-30162?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Nasir Ali updated SPARK-30162: ------------------------------ Description: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], ReadSchema: struct<id:double,ts:timestamp>{code} {code:java} // Spark 3.0.0-preview output == Parsed Logical Plan == 'Filter ('user = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#2 = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan == Filter (isnotnull(user#2) AND (user#2 = usr2)) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan == *(1) Project [id#0, ts#1, user#2] +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) +- *(1) ColumnarToRow +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data], ReadSchema: struct<id:double,ts:timestamp> {code} I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard to debug. spark.sql.orc.cache.stripe.details.size=10000 doesn't work. was: Filters are not pushed down in Spark 3.0 preview. Also the output of "explain" method is different. It is hard to debug in 3.0 whether filters were pushed down or not. Below code could reproduce the bug: {code:java} // code placeholder df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), ("usr1",13.00, "2018-03-11T12:27:18+00:00"), ("usr1",25.00, "2018-03-12T11:27:18+00:00"), ("usr1",20.00, "2018-03-13T15:27:18+00:00"), ("usr1",17.00, "2018-03-14T12:27:18+00:00"), ("usr2",99.00, "2018-03-15T11:27:18+00:00"), ("usr2",156.00, "2018-03-22T11:27:18+00:00"), ("usr2",17.00, "2018-03-31T11:27:18+00:00"), ("usr2",25.00, "2018-03-15T11:27:18+00:00"), ("usr2",25.00, "2018-03-16T11:27:18+00:00") ], ["user","id", "ts"]) df = df.withColumn('ts', df.ts.cast('timestamp')) df.write.partitionBy("user").parquet("/home/cnali/data/")df2 = spark.read.load("/home/cnali/data/")df2.filter("user=='usr2'").explain(True) {code} {code:java} // Spark 2.4 output == Parsed Logical Plan == 'Filter ('user = usr2) +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#40 = usr2) +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == Filter (isnotnull(user#40) && (user#40 = usr2)) +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], ReadSchema: struct<id:double,ts:timestamp>{code} {code:java} // Spark 3.0.0-preview output == Parsed Logical Plan == 'Filter ('user = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed Logical Plan == id: double, ts: timestamp, user: string Filter (user#2 = usr2) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized Logical Plan == Filter (isnotnull(user#2) AND (user#2 = usr2)) +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical Plan == *(1) Project [id#0, ts#1, user#2] +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) +- *(1) ColumnarToRow +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: InMemoryFileIndex[file:/home/cnali/data], ReadSchema: struct<id:double,ts:timestamp> {code} I have tested it on much larger dataset. Spark 3.0 tries to load whole data and then apply filter. Whereas Spark 2.4 push down the filter. Above output shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard to debug. spark.sql.orc.cache.stripe.details.size=10000 doesn't work. > Filter is not being pushed down for Parquet files > ------------------------------------------------- > > Key: SPARK-30162 > URL: https://issues.apache.org/jira/browse/SPARK-30162 > Project: Spark > Issue Type: Bug > Components: SQL > Affects Versions: 3.0.0 > Environment: pyspark 3.0 preview > Ubuntu/Centos > pyarrow 0.14.1 > Reporter: Nasir Ali > Priority: Major > > Filters are not pushed down in Spark 3.0 preview. Also the output of > "explain" method is different. It is hard to debug in 3.0 whether filters > were pushed down or not. Below code could reproduce the bug: > > {code:java} > // code placeholder > df = spark.createDataFrame([("usr1",17.00, "2018-03-10T15:27:18+00:00"), > ("usr1",13.00, "2018-03-11T12:27:18+00:00"), > ("usr1",25.00, "2018-03-12T11:27:18+00:00"), > ("usr1",20.00, "2018-03-13T15:27:18+00:00"), > ("usr1",17.00, "2018-03-14T12:27:18+00:00"), > ("usr2",99.00, "2018-03-15T11:27:18+00:00"), > ("usr2",156.00, "2018-03-22T11:27:18+00:00"), > ("usr2",17.00, "2018-03-31T11:27:18+00:00"), > ("usr2",25.00, "2018-03-15T11:27:18+00:00"), > ("usr2",25.00, "2018-03-16T11:27:18+00:00") > ], > ["user","id", "ts"]) > df = df.withColumn('ts', df.ts.cast('timestamp')) > df.write.partitionBy("user").parquet("/home/nasir/data/")df2 = > spark.read.load("/home/nasir/data/")df2.filter("user=='usr2'").explain(True) > {code} > {code:java} > // Spark 2.4 output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Analyzed Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#40 = usr2) > +- Relation[id#38,ts#39,user#40] parquet== Optimized Logical Plan == > Filter (isnotnull(user#40) && (user#40 = usr2)) > +- Relation[id#38,ts#39,user#40] parquet== Physical Plan == > *(1) FileScan parquet [id#38,ts#39,user#40] Batched: true, Format: Parquet, > Location: InMemoryFileIndex[file:/home/cnali/data], PartitionCount: 1, > PartitionFilters: [isnotnull(user#40), (user#40 = usr2)], PushedFilters: [], > ReadSchema: struct<id:double,ts:timestamp>{code} > {code:java} > // Spark 3.0.0-preview output > == Parsed Logical Plan == > 'Filter ('user = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Analyzed > Logical Plan == > id: double, ts: timestamp, user: string > Filter (user#2 = usr2) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Optimized > Logical Plan == > Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- RelationV2[id#0, ts#1, user#2] parquet file:/home/cnali/data== Physical > Plan == > *(1) Project [id#0, ts#1, user#2] > +- *(1) Filter (isnotnull(user#2) AND (user#2 = usr2)) > +- *(1) ColumnarToRow > +- BatchScan[id#0, ts#1, user#2] ParquetScan Location: > InMemoryFileIndex[file:/home/cnali/data], ReadSchema: > struct<id:double,ts:timestamp> > {code} > I have tested it on much larger dataset. Spark 3.0 tries to load whole data > and then apply filter. Whereas Spark 2.4 push down the filter. Above output > shows that Spark 2.4 applied partition filter but not the Spark 3.0 preview. > > Minor: in Spark 3.0 output is truncated (maybe fixed length?) and it's hard > to debug. spark.sql.orc.cache.stripe.details.size=10000 doesn't work. > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org