[
https://issues.apache.org/jira/browse/SPARK-35745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362929#comment-17362929
]
Hadrien Glaude commented on SPARK-35745:
----------------------------------------
> This is the correct way to avoid this problem.
How come this is the correct way to avoid this problem ? There shouldn't be a
need for using asNondeterministic as the pandas udf IS deterministic ?!
> Serie to Scalar pandas_udf in GroupedData.agg() breaks the following
> monotonically_increasing_id()
> --------------------------------------------------------------------------------------------------
>
> Key: SPARK-35745
> URL: https://issues.apache.org/jira/browse/SPARK-35745
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.1.1, 3.1.2
> Environment: I was able to reproduce this with both
> pyspark == ' 3.1.1'
> pyarrow == '3.0.0'
> Python 3.7.10
> and
> pyspark == '3.1.2'
> pyarrow == '4.0.1'
> Python 3.7.9
> Reporter: Hadrien Glaude
> Priority: Major
>
> Hello,
> I encountered an issue when using a Serie to Scalar `{{panda_udf}}` in
> `{{GroupedData.agg()}}` followed by `{{monotonically_increasing_id()}}`. I
> obtain duplicated ids. Actually, the partition offset in the id seems to be
> zero on all partitions. The problem is avoided by using
> `{{asNondeterministic}}`.
> Minimal reproducing example
> {code:java}
> from pyspark.sql import SparkSession
> import pyspark.sql.functions as F
> from pyspark.sql.functions import pandas_udf
> import pandas as pd
> from pyspark.sql.types import IntegerType
> spark = SparkSession.builder\
> .config("spark.sql.execution.arrow.pyspark.enabled", "true")\
> .config("spark.sql.shuffle.partitions", "8")\
> .master("local[4]").getOrCreate()
> @pandas_udf(IntegerType())
> def sum_pandas(vals: pd.Series) -> int:
> return int(vals.to_numpy().sum())
> @pandas_udf(IntegerType())
> def sum_pandas2(vals: pd.Series) -> int:
> return int(vals.to_numpy().sum())
> sum_pandas2 = sum_pandas2.asNondeterministic()
> l = [(i%100,i) for i in range(2000)]
> data = spark.createDataFrame(l, schema=["col1","col2"])
> data.groupby("col1").agg(sum_pandas("col2").alias("sum"))\
> .withColumn("group_id", F.monotonically_increasing_id()).show()
> data = spark.createDataFrame(l, schema=["col1","col2"])
> data.groupby("col1").agg(sum_pandas2("col2").alias("sum"))\
> .withColumn("group_id", F.monotonically_increasing_id()).show(){code}
> Output
> {code:java}
> +----+-----+--------+
> |col1| sum|group_id|
> +----+-----+--------+
> | 2|19040| 0|
> | 12|19240| 1|
> | 26|19520| 2|
> | 28|19560| 3|
> | 29|19580| 4|
> | 30|19600| 5|
> | 33|19660| 6|
> | 42|19840| 7|
> | 48|19960| 8|
> | 67|20340| 9|
> | 73|20460| 10|
> | 88|20760| 11|
> | 91|20820| 12|
> | 93|20860| 13|
> | 9|19180| 0|
> | 11|19220| 1|
> | 22|19440| 2|
> | 32|19640| 3|
> | 36|19720| 4|
> | 40|19800| 5|
> +----+-----+--------+
> only showing top 20 rows
> +----+-----+----------+
> |col1| sum| group_id|
> +----+-----+----------+
> | 2|19040| 0|
> | 12|19240| 1|
> | 26|19520| 2|
> | 28|19560| 3|
> | 29|19580| 4|
> | 30|19600| 5|
> | 33|19660| 6|
> | 42|19840| 7|
> | 48|19960| 8|
> | 67|20340| 9|
> | 73|20460| 10|
> | 88|20760| 11|
> | 91|20820| 12|
> | 93|20860| 13|
> | 9|19180|8589934592|
> | 11|19220|8589934593|
> | 22|19440|8589934594|
> | 32|19640|8589934595|
> | 36|19720|8589934596|
> | 40|19800|8589934597|
> +----+-----+----------+
> only showing top 20 rows
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]