[
https://issues.apache.org/jira/browse/SPARK-22541?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan reassigned SPARK-22541:
-----------------------------------
Assignee: Liang-Chi Hsieh
> Dataframes: applying multiple filters one after another using udfs and
> accumulators results in faulty accumulators
> ------------------------------------------------------------------------------------------------------------------
>
> Key: SPARK-22541
> URL: https://issues.apache.org/jira/browse/SPARK-22541
> Project: Spark
> Issue Type: Documentation
> Components: PySpark
> Affects Versions: 2.2.0
> Environment: pyspark 2.2.0, ubuntu
> Reporter: Janne K. Olesen
> Assignee: Liang-Chi Hsieh
> Fix For: 2.3.0
>
>
> I'm using udf filters and accumulators to keep track of filtered rows in
> dataframes.
> If I'm applying multiple filters one after the other, they seem to be
> executed in parallel, not in sequence, which messes with the accumulators i'm
> using to keep track of filtered data.
> {code:title=example.py|borderStyle=solid}
> from pyspark.sql.functions import udf, col
> from pyspark.sql.types import BooleanType
> from pyspark.sql import SparkSession
> spark = SparkSession.builder.getOrCreate()
> sc = spark.sparkContext
> df = spark.createDataFrame([("a", 1, 1), ("b", 2, 2), ("c", 3, 3)], ["key",
> "val1", "val2"])
> def __myfilter(val, acc):
> if val < 2:
> return True
> else:
> acc.add(1)
> return False
> acc1 = sc.accumulator(0)
> acc2 = sc.accumulator(0)
> def myfilter1(val):
> return __myfilter(val, acc1)
> def myfilter2(val):
> return __myfilter(val, acc2)
> my_udf1 = udf(myfilter1, BooleanType())
> my_udf2 = udf(myfilter2, BooleanType())
> df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # | b| 2| 2|
> # | c| 3| 3|
> # +---+----+----+
> df = df.filter(my_udf1(col("val1")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df = df.filter(my_udf2(col("val2")))
> # df.show()
> # +---+----+----+
> # |key|val1|val2|
> # +---+----+----+
> # | a| 1| 1|
> # +---+----+----+
> # expected acc1: 2
> # expected acc2: 0
> df.show()
> print("acc1: %s" % acc1.value) # expected 2, is 2 OK
> print("acc2: %s" % acc2.value) # expected 0, is 2 !!!
> {code}
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]