[ 
https://issues.apache.org/jira/browse/SPARK-18712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15724381#comment-15724381
 ] 

Cheng Lian edited comment on SPARK-18712 at 12/6/16 5:10 AM:
-------------------------------------------------------------

I think the contract here is that for a DataFrame {{df}} and 1 or more 
consecutive filter predicates applied to {{df}}, each filter predicate must be 
a full function over the output of {{df}}. Only in this way, we can ensure that 
the execution order of all the filter predicates can be irrelevant. This 
contract is important for optimizations like filter push-down. If we have to 
preserve execution order of all filter predicates, you won't be able to push 
down {{a}} in {{a AND b}}, and lose lots of optimization opportunities.

In the case of the snippet in the JIRA description, the first predicate is a 
full function while the second is a partial function of the output of the 
original {{df}}.


was (Author: lian cheng):
I think the contract here is that for a DataFrame {{df}} and 1 or more 
consecutive filter predicates applied to {{df}}, each filter predicate must be 
a full function over the output of {{df}}. Only in this way, we can ensure that 
the execution order of all the filter predicates can be irrelevant. This 
contract is important for optimizations like filter push-down. If we have to 
preserve execution order of all filter predicates, you won't be able to push 
down {{a}} in {{a AND b}}, and lose lots of optimization opportunities.

> keep the order of sql expression and support short circuit
> ----------------------------------------------------------
>
>                 Key: SPARK-18712
>                 URL: https://issues.apache.org/jira/browse/SPARK-18712
>             Project: Spark
>          Issue Type: Wish
>          Components: SQL
>    Affects Versions: 2.0.2
>         Environment: Ubuntu 16.04
>            Reporter: yahsuan, chang
>
> The following python code fails with spark 2.0.2, but works with spark 1.5.2
> {code}
> # a.py
> import pyspark
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> sc = pyspark.SparkContext()
> sqlc = pyspark.SQLContext(sc)
> table = {5: True, 6: False}
> df = sqlc.range(10)
> df = df.where(df['id'].isin(5, 6))
> f = F.udf(lambda x: table[x], T.BooleanType())
> df = df.where(f(df['id']))
> # df.explain(True)
> print(df.count())
> {code}
> here is the exception 
> {code}
> KeyError: 0
> {code}
> I guess the problem is about the order of sql expression.
> the following are the explain of two spark version
> {code}
> # explain of spark 2.0.2
> == Parsed Logical Plan ==
> Filter <lambda>(id#0L)
> +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint))
>    +- Range (0, 10, step=1, splits=Some(4))
> == Analyzed Logical Plan ==
> id: bigint
> Filter <lambda>(id#0L)
> +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint))
>    +- Range (0, 10, step=1, splits=Some(4))
> == Optimized Logical Plan ==
> Filter (id#0L IN (5,6) && <lambda>(id#0L))
> +- Range (0, 10, step=1, splits=Some(4))
> == Physical Plan ==
> *Project [id#0L]
> +- *Filter (id#0L IN (5,6) && pythonUDF0#5)
>    +- BatchEvalPython [<lambda>(id#0L)], [id#0L, pythonUDF0#5]
>       +- *Range (0, 10, step=1, splits=Some(4))
> {code}
> {code}
> # explain of spark 1.5.2
> == Parsed Logical Plan ==
> 'Project [*,PythonUDF#<lambda>(id#0L) AS sad#1]
>  Filter id#0L IN (cast(5 as bigint),cast(6 as bigint))
>   LogicalRDD [id#0L], MapPartitionsRDD[3] at range at 
> NativeMethodAccessorImpl.java:-2
> == Analyzed Logical Plan ==
> id: bigint, sad: int
> Project [id#0L,sad#1]
>  Project [id#0L,pythonUDF#2 AS sad#1]
>   EvaluatePython PythonUDF#<lambda>(id#0L), pythonUDF#2
>    Filter id#0L IN (cast(5 as bigint),cast(6 as bigint))
>     LogicalRDD [id#0L], MapPartitionsRDD[3] at range at 
> NativeMethodAccessorImpl.java:-2
> == Optimized Logical Plan ==
> Project [id#0L,pythonUDF#2 AS sad#1]
>  EvaluatePython PythonUDF#<lambda>(id#0L), pythonUDF#2
>   Filter id#0L IN (5,6)
>    LogicalRDD [id#0L], MapPartitionsRDD[3] at range at 
> NativeMethodAccessorImpl.java:-2
> == Physical Plan ==
> TungstenProject [id#0L,pythonUDF#2 AS sad#1]
>  !BatchPythonEvaluation PythonUDF#<lambda>(id#0L), [id#0L,pythonUDF#2]
>   Filter id#0L IN (5,6)
>    Scan PhysicalRDD[id#0L]
> Code Generation: true
> {code}
> Also, I am not sure if the sql expression support short circuit evaluation, 
> so I do the following experiment
> {code}
> import pyspark
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> sc = pyspark.SparkContext()
> sqlc = pyspark.SQLContext(sc)
> def f(x):
>     print('in f')
>     return True
> f_udf = F.udf(f, T.BooleanType())
> df = sqlc.createDataFrame([(1, 2)], schema=['a', 'b'])
> df = df.where(f_udf('a') | f_udf('b'))
> df.show()
> {code}
> and I got the following output for both spark 1.5.2 and spark 2.0.2
> {code}
> in f
> in f
> +---+---+
> |  a|  b|
> +---+---+
> |  1|  2|
> +---+---+
> {code}
> there is only one element in dataframe df, but the function f has been called 
> twice, so I guess no short circuit.
> the result seems to conflict with #SPARK-1461



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to