[ 
https://issues.apache.org/jira/browse/SPARK-25591?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdeali Kothari updated SPARK-25591:
------------------------------------
    Description: 
When having multiple Python UDFs - the last Python UDF's accumulator is the 
only accumulator that gets updated.


{code:python}
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark import AccumulatorParam

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
test_accum = spark.sparkContext.accumulator(0.0)

SHUFFLE = False

def main(data):
    print(">>> Check0", test_accum.value)
    def test(x):
        global test_accum
        test_accum += 1.0
        return x

    print(">>> Check1", test_accum.value)

    def test2(x):
        global test_accum
        test_accum += 100.0
        return x

    print(">>> Check2", test_accum.value)
    func_udf = F.udf(test, T.DoubleType())
    print(">>> Check3", test_accum.value)
    func_udf2 = F.udf(test2, T.DoubleType())
    print(">>> Check4", test_accum.value)

    data = data.withColumn("out1", func_udf(data["a"]))
    if SHUFFLE:
        data = data.repartition(2)
    print(">>> Check5", test_accum.value)
    data = data.withColumn("out2", func_udf2(data["b"]))
    if SHUFFLE:
        data = data.repartition(2)
    print(">>> Check6", test_accum.value)

    data.show()  # ACTION
    print(">>> Check7", test_accum.value)
    return data


df = spark.createDataFrame([
    [1.0, 2.0]
], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
field_name in ["a", "b"]]))

df2 = main(df)
{code}


{code:python}
######## Output 1 - with SHUFFLE=False
...
# >>> Check7 100.0


######## Output 2 - with SHUFFLE=True
...
# >>> Check7 101.0
{code}

Basically looks like:
 - Accumulator works only for last UDF before a shuffle-like operation

  was:
When having multiple Python UDFs - the last Python UDF's accumulator is the 
only accumulator that gets updated.


{code:python}
import pyspark
from pyspark.sql import SparkSession, Row
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark import AccumulatorParam

spark = SparkSession.builder.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
test_accum = spark.sparkContext.accumulator(0.0)

SHUFFLE = False

def main(data):
    print(">>> Check0", test_accum.value)
    def test(x):
        global test_accum
        test_accum += 1.0
        return x

    print(">>> Check1", test_accum.value)

    def test2(x):
        global test_accum
        test_accum += 100.0
        return x

    print(">>> Check2", test_accum.value)
    func_udf = F.udf(test, T.DoubleType())
    print(">>> Check3", test_accum.value)
    func_udf2 = F.udf(test2, T.DoubleType())
    print(">>> Check4", test_accum.value)

    data = data.withColumn("out1", func_udf(data["a"]))
    if SHUFFLE:
        data = data.repartition(2)
    print(">>> Check5", test_accum.value)
    data = data.withColumn("out2", func_udf2(data["b"]))
    if SHUFFLE:
        data = data.repartition(2)
    print(">>> Check6", test_accum.value)

    data.show()  # ACTION
    print(">>> Check7", test_accum.value)
    return data


df = spark.createDataFrame([
    [1.0, 2.0]
], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
field_name in ["a", "b"]]))

df2 = main(df)
{code}



######## Output 1 - with SHUFFLE=False
...
# >>> Check7 100.0


######## Output 2 - with SHUFFLE=True
...
# >>> Check7 101.0

Basically looks like:
 - Accumulator works only for last UDF before a shuffle-like operation


> PySpark Accumulators with multiple PythonUDFs
> ---------------------------------------------
>
>                 Key: SPARK-25591
>                 URL: https://issues.apache.org/jira/browse/SPARK-25591
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.3.2
>            Reporter: Abdeali Kothari
>            Priority: Major
>
> When having multiple Python UDFs - the last Python UDF's accumulator is the 
> only accumulator that gets updated.
> {code:python}
> import pyspark
> from pyspark.sql import SparkSession, Row
> from pyspark.sql import functions as F
> from pyspark.sql import types as T
> from pyspark import AccumulatorParam
> spark = SparkSession.builder.getOrCreate()
> spark.sparkContext.setLogLevel("ERROR")
> test_accum = spark.sparkContext.accumulator(0.0)
> SHUFFLE = False
> def main(data):
>     print(">>> Check0", test_accum.value)
>     def test(x):
>         global test_accum
>         test_accum += 1.0
>         return x
>     print(">>> Check1", test_accum.value)
>     def test2(x):
>         global test_accum
>         test_accum += 100.0
>         return x
>     print(">>> Check2", test_accum.value)
>     func_udf = F.udf(test, T.DoubleType())
>     print(">>> Check3", test_accum.value)
>     func_udf2 = F.udf(test2, T.DoubleType())
>     print(">>> Check4", test_accum.value)
>     data = data.withColumn("out1", func_udf(data["a"]))
>     if SHUFFLE:
>         data = data.repartition(2)
>     print(">>> Check5", test_accum.value)
>     data = data.withColumn("out2", func_udf2(data["b"]))
>     if SHUFFLE:
>         data = data.repartition(2)
>     print(">>> Check6", test_accum.value)
>     data.show()  # ACTION
>     print(">>> Check7", test_accum.value)
>     return data
> df = spark.createDataFrame([
>     [1.0, 2.0]
> ], schema=T.StructType([T.StructField(field_name, T.DoubleType(), True) for 
> field_name in ["a", "b"]]))
> df2 = main(df)
> {code}
> {code:python}
> ######## Output 1 - with SHUFFLE=False
> ...
> # >>> Check7 100.0
> ######## Output 2 - with SHUFFLE=True
> ...
> # >>> Check7 101.0
> {code}
> Basically looks like:
>  - Accumulator works only for last UDF before a shuffle-like operation



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to