[
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16258868#comment-16258868
]
Liang-Chi Hsieh edited comment on SPARK-22541 at 11/20/17 7:01 AM:
-------------------------------------------------------------------
Sorry, my previous reply is not completely correct.
This behavior is also related to how PySpark runs python udfs. We can try to
show the query plan of the {{df}}:
{code}
...
== Physical Plan ==
*Project [key#0, val1#1L, val2#2L]
+- *Filter (pythonUDF0#21 && pythonUDF1#22)
+- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0,
val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22]
+- Scan ExistingRDD[key#0,val1#1L,val2#2L]
{code}
The python udfs are pushed down to a special physical operator
{{BatchEvalPython}} to execute. Due to the implementation details, the pushed
down python udfs are not conditional. That's said they are evaluated on all
rows, even logically in the original query they are only evaluated on part of
rows by using some conditional expressions such as when or if. The issue you
found here is also the same reason.
was (Author: viirya):
Sorry, my previous reply is not completely correct.
This behavior is related to how PySpark runs python udfs. We can try to show
the query plan of the {{df}}:
{code}
...
== Physical Plan ==
*Project [key#0, val1#1L, val2#2L]
+- *Filter (pythonUDF0#21 && pythonUDF1#22)
+- BatchEvalPython [myfilter1(val1#1L), myfilter2(val2#2L)], [key#0,
val1#1L, val2#2L, pythonUDF0#21, pythonUDF1#22]
+- Scan ExistingRDD[key#0,val1#1L,val2#2L]
{code}
The python udfs are pushed down to a special physical operator
{{BatchEvalPython}} to execute. Due to the implementation details, the pushed
down python udfs are not conditional. That's said they are evaluated on all
rows, even logically in the original query they are only evaluated on part of
rows by using some conditional expressions such as when or if. The issue you
found here is also the same reason.
> Dataframes: applying multiple filters one after another using udfs and
> accumulators results in faulty accumulators
> ------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
> Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be
> executed in parallel, not in sequence, which messes with the accumulators i'm
> using to keep track of filtered data.
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key",
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # | b| 2| 2|
> # | c| 3| 3|
> # +---+----+----+
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value) # expected 2, is 2 OK
> print("acc2: %s" % acc2.value) # expected 0, is 2 !!!
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]