[ 
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16256676#comment-16256676
 ] 

Janne K. Olesen commented on SPARK-22541:
-----------------------------------------

I agree, the filtered results are correct, but that is beside the point. It 
seems like query optimization does something like
{noformat}
  for row in df:
     result_a = filter1(row)
     result_b = filter2(row)
     result = result_a && result_b
{noformat}

but in my opion it should be 
{noformat}
  for row in df:
     result_a = filter1(row)
     if result_a == True:
        return filter2(row)
    else
       return False
{noformat}

If filter2 is executed regardless of the result of filter1, this can lead to 
strange errors. Considering the following example:
{code:title=Example.py}
from pyspark.sql.functions import udf, col
from pyspark.sql.types import BooleanType
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
df_input = spark.createDataFrame([("a", None), ("b", 2), ("c", 3)], ["key", 
"val"])

# works as expected
df = df_input.filter(col("val").isNotNull())
df = df.filter(col("val") > 2)
df.show()

# this will raise an error and fail
# TypeError: '>' not supported between instances of 'NoneType' and 'int'
isNotNone = udf(lambda x: x is not None, BooleanType())
filter2 = udf(lambda x: x > 2, BooleanType())
df = df_input.filter(isNotNone(col("val")))
df = df.filter(filter2(col("val")))
df.show()
{code}



> Dataframes: applying multiple filters one after another using udfs and 
> accumulators results in faulty accumulators
> ------------------------------------------------------------------------------------------------------------------
>
>                 Key: SPARK-22541
>                 URL: https://issues.apache.org/jira/browse/SPARK-22541
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.2.0
>         Environment: pyspark 2.2.0, ubuntu
>            Reporter: Janne K. Olesen
>
> I'm using udf filters and accumulators to keep track of filtered rows in 
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be 
> executed in parallel, not in sequence, which messes with the accumulators i'm 
> using to keep track of filtered data. 
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key", 
> "val1", "val2"])
> def __myfilter(val, acc):
>     if val < 2:
>         return True
>     else:
>         acc.add(1)
>     return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
>     return __myfilter(val, acc1)
> def myfilter2(val):
>     return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # |  b|   2|   2|
> # |  c|   3|   3|
> # +---+----+----+
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # |  a|   1|   1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value)  # expected 2, is 2 OK
> print("acc2: %s" % acc2.value)  # expected 0, is 2 !!!
> {code}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to