Stefan Fehrenbach created SPARK-16681:
-----------------------------------------

             Summary: Optimizer changes order of filter predicates involving 
UDFs, which changes semantics
                 Key: SPARK-16681
                 URL: https://issues.apache.org/jira/browse/SPARK-16681
             Project: Spark
          Issue Type: Bug
    Affects Versions: 2.0.0
            Reporter: Stefan Fehrenbach


For some queries, the optimizer changes the order in which filter predicates 
are executed. For UDFs that are not total, this changes the semantics.

A simple example:

Create an input dataset of one record, with one field "a" with value 2:
{code:scala}
val ds = sparkSession.createDataFrame(Seq(Row(1)).toList.asJava, 
StructType(Seq(StructField("a", IntegerType))))
{code}

Write a query like this:
{code:scala}
val e = ds.filter(column("a").equalTo(2))
      .select(udf(throws, IntegerType)(column("a")).as("foo"))
      .filter(column("foo").leq(2))
{code}

Where {{throws}} is a function which throws for any argument that is not 2, 
like this one:
{code:scala}
    val throws = (x: Integer) =>
      if (x == 2) 2
      else throw new Exception
{code}

The first filter in the query filters out any values that are not 2.
Thus, when we execute the UDF in the select, we expect it to only be run on 
values that are 2.
This is not the case, the execution throws:
{noformat}
16/07/22 09:22:15 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.Exception
        at SparkBug$$anonfun$1.apply(SparkBug.scala:18)
        at SparkBug$$anonfun$1.apply(SparkBug.scala:16)
        at 
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown
 Source)
        at 
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
        at 
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$7$$anon$1.hasNext(WholeStageCodegenExec.scala:357)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:246)
        at 
org.apache.spark.sql.execution.SparkPlan$$anonfun$3.apply(SparkPlan.scala:240)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
        at 
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$23.apply(RDD.scala:774)
        at 
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:318)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:282)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
        at org.apache.spark.scheduler.Task.run(Task.scala:85)
...
{noformat}

That's what the logical query plan says:
{noformat}
== Analyzed Logical Plan ==
foo: int
Filter (foo#3 <= 2)
+- Project [UDF(a#0) AS foo#3]
   +- Filter (a#0 = 2)
      +- LocalRelation [a#0], [[1]]
{noformat}

Unfortunately, the optimized logical plan and finally the physical plan 
disagree:
{noformat}
== Optimized Logical Plan ==
Project [UDF(a#0) AS foo#3]
+- Filter (((UDF(a#0) <= 2) && isnotnull(a#0)) && (a#0 = 2))
   +- LocalRelation [a#0], [[1]]

== Physical Plan ==
WholeStageCodegen
:  +- Project [UDF(a#0) AS foo#3]
:     +- Filter (((UDF(a#0) <= 2) && isnotnull(a#0)) && (a#0 = 2))
:        +- INPUT
+- LocalTableScan [a#0], [[1]]
{noformat}

There is a seemingly related bug: 
https://issues.apache.org/jira/browse/SPARK-13773
That bug mentions a PR which adds documentation to UDFs saying that they need 
to be deterministic.
This is not enough! The UDF in the code above is perfectly deterministic. It 
throws every time it's called with illegal input.

The failing test from above is in a github repo: 
https://github.com/fehrenbach/spark-bug/blob/master/src/main/scala/SparkBug.scala
Just clone and run {{sbt run}}.

I'd be happy to fix this myself, if someone wants to hold my hand doing it. I 
haven't looked at the Spark source code before and would not know where to 
start.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to