Josh Rosen created SPARK-27969: ---------------------------------- Summary: Non-deterministic expressions in filters or projects can unnecessarily prevent all scan-time column pruning, harming performance Key: SPARK-27969 URL: https://issues.apache.org/jira/browse/SPARK-27969 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.0 Reporter: Josh Rosen
If a scan operator is followed by a projection or filter and those operators contain _any_ non-deterministic expressions then scan column pruning optimizations are completely skipped, harming query performance. Here's an example of the problem: {code:java} import org.apache.spark.sql.functions._ val df = spark.createDataset(Seq( (1, 2, 3, 4, 5), (1, 2, 3, 4, 5) )) val tmpPath = java.nio.file.Files.createTempDirectory("column-pruning-bug").toString() df.write.parquet(tmpPath) val fromParquet = spark.read.parquet(tmpPath){code} If all expressions are deterministic then, as expected, column pruning is pushed into the scan {code:java} fromParquet.select("_1").explain == Physical Plan == *(1) FileScan parquet [_1#68] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug7865798834978814548], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int>{code} However, if we add a non-deterministic filter then no column pruning is performed (even though pruning would be safe!): {code:java} fromParquet.select("_1").filter(rand() =!= 0).explain == Physical Plan == *(1) Project [_1#127] +- *(1) Filter NOT (rand(-1515289268025792238) = 0.0) +- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int,_3:int,_4:int,_5:int>{code} Similarly, a non-deterministic expression in a second projection can end up being collapsed such that it prevents column pruning: {code:java} fromParquet.select("_1").select($"_1", rand()).explain == Physical Plan == *(1) Project [_1#127, rand(1267140591146561563) AS rand(1267140591146561563)#141] +- *(1) FileScan parquet [_1#127,_2#128,_3#129,_4#130,_5#131] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[dbfs:/local_disk0/tmp/column-pruning-bug4043817424882943496], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<_1:int,_2:int,_3:int,_4:int,_5:int> {code} I believe that this is caused by SPARK-10316: the Parquet column pruning code relies on the [{{PhysicalOperation}} unapply method|https://github.com/apache/spark/blob/v2.4.3/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/planning/patterns.scala#L43] for extracting projects and filters and this helper purposely fails to match if _any_ projection or filter is non-deterministic. It looks like this conservative behavior may have originally been added to avoid pushdown / re-ordering of non-deterministic filter expressions. However, in this case I feel that it's _too_ conservative: even though we can't push down non-deterministic filters we should still be able to perform column pruning. /cc [~cloud_fan] and [~marmbrus] (it looks like you [discussed collapsing of non-deterministic projects|https://github.com/apache/spark/pull/8486#issuecomment-136036533] in the SPARK-10316 PR, which is related to why the third example above did not prune). -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org