[
https://issues.apache.org/jira/browse/SPARK-18712?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15724391#comment-15724391
]
Wenchen Fan commented on SPARK-18712:
-------------------------------------
Spark SQL has no guarantee about the filter conditions execution order, if the
condition is deterministic. According to the fact that UDF must be
deterministic, I'm afraid you can't write a filter condition UDF that depends
on previous conditions. A workaround is, embed the previous conditions in your
UDF.
The code snippet works in 1.5 because there is an issue in 2.0. The
`PushDownPredicates` rule doesn't work for `BatchEvalPythonExec` anymore,
anyone wanna work on it? cc [~smilegator] [~dongjoon]
> keep the order of sql expression and support short circuit
> ----------------------------------------------------------
>
> Key: SPARK-18712
> URL: https://issues.apache.org/jira/browse/SPARK-18712
> Project: Spark
> Issue Type: Wish
> Components: SQL
> Affects Versions: 2.0.2
> Environment: Ubuntu 16.04
> Reporter: yahsuan, chang
>
> The following python code fails with spark 2.0.2, but works with spark 1.5.2
> {code}
> # a.py
> import pyspark
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> sc = pyspark.SparkContext()
> sqlc = pyspark.SQLContext(sc)
> table = {5: True, 6: False}
> df = sqlc.range(10)
> df = df.where(df['id'].isin(5, 6))
> f = F.udf(lambda x: table[x], T.BooleanType())
> df = df.where(f(df['id']))
> # df.explain(True)
> print(df.count())
> {code}
> here is the exception
> {code}
> KeyError: 0
> {code}
> I guess the problem is about the order of sql expression.
> the following are the explain of two spark version
> {code}
> # explain of spark 2.0.2
> == Parsed Logical Plan ==
> Filter <lambda>(id#0L)
> +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint))
> +- Range (0, 10, step=1, splits=Some(4))
> == Analyzed Logical Plan ==
> id: bigint
> Filter <lambda>(id#0L)
> +- Filter cast(id#0L as bigint) IN (cast(5 as bigint),cast(6 as bigint))
> +- Range (0, 10, step=1, splits=Some(4))
> == Optimized Logical Plan ==
> Filter (id#0L IN (5,6) && <lambda>(id#0L))
> +- Range (0, 10, step=1, splits=Some(4))
> == Physical Plan ==
> *Project [id#0L]
> +- *Filter (id#0L IN (5,6) && pythonUDF0#5)
> +- BatchEvalPython [<lambda>(id#0L)], [id#0L, pythonUDF0#5]
> +- *Range (0, 10, step=1, splits=Some(4))
> {code}
> {code}
> # explain of spark 1.5.2
> == Parsed Logical Plan ==
> 'Project [*,PythonUDF#<lambda>(id#0L) AS sad#1]
> Filter id#0L IN (cast(5 as bigint),cast(6 as bigint))
> LogicalRDD [id#0L], MapPartitionsRDD[3] at range at
> NativeMethodAccessorImpl.java:-2
> == Analyzed Logical Plan ==
> id: bigint, sad: int
> Project [id#0L,sad#1]
> Project [id#0L,pythonUDF#2 AS sad#1]
> EvaluatePython PythonUDF#<lambda>(id#0L), pythonUDF#2
> Filter id#0L IN (cast(5 as bigint),cast(6 as bigint))
> LogicalRDD [id#0L], MapPartitionsRDD[3] at range at
> NativeMethodAccessorImpl.java:-2
> == Optimized Logical Plan ==
> Project [id#0L,pythonUDF#2 AS sad#1]
> EvaluatePython PythonUDF#<lambda>(id#0L), pythonUDF#2
> Filter id#0L IN (5,6)
> LogicalRDD [id#0L], MapPartitionsRDD[3] at range at
> NativeMethodAccessorImpl.java:-2
> == Physical Plan ==
> TungstenProject [id#0L,pythonUDF#2 AS sad#1]
> !BatchPythonEvaluation PythonUDF#<lambda>(id#0L), [id#0L,pythonUDF#2]
> Filter id#0L IN (5,6)
> Scan PhysicalRDD[id#0L]
> Code Generation: true
> {code}
> Also, I am not sure if the sql expression support short circuit evaluation,
> so I do the following experiment
> {code}
> import pyspark
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> sc = pyspark.SparkContext()
> sqlc = pyspark.SQLContext(sc)
> def f(x):
> print('in f')
> return True
> f_udf = F.udf(f, T.BooleanType())
> df = sqlc.createDataFrame([(1, 2)], schema=['a', 'b'])
> df = df.where(f_udf('a') | f_udf('b'))
> df.show()
> {code}
> and I got the following output for both spark 1.5.2 and spark 2.0.2
> {code}
> in f
> in f
> +---+---+
> | a| b|
> +---+---+
> | 1| 2|
> +---+---+
> {code}
> there is only one element in dataframe df, but the function f has been called
> twice, so I guess no short circuit.
> the result seems to conflict with #SPARK-1461
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]