Hadrien Glaude created SPARK-35745:
--------------------------------------
Summary: Serie to Scalar pandas_udf in GroupedData.agg() breaks
the following monotonically_increasing_id()
Key: SPARK-35745
URL: https://issues.apache.org/jira/browse/SPARK-35745
Project: Spark
Issue Type: Bug
Components: PySpark
Affects Versions: 3.1.2, 3.1.1
Environment: I was able to reproduce this with both
pyspark == ' 3.1.1'
pyarrow == '3.0.0'
Python 3.7.10
and
pyspark == '3.1.2'
pyarrow == '4.0.1'
Python 3.7.9
Reporter: Hadrien Glaude
Hello,
I encountered an issue when using a Serie to Scalar `{{panda_udf}}` in
`{{GroupedData.agg()}}` followed by `{{monotonically_increasing_id()}}`. I
obtain duplicated ids. Actually, the partition offset in the id seems to be
zero on all partitions. The problem is avoided by using
`{{asNondeterministic}}`.
Minimal reproducing example
{code:java}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import IntegerType
spark = SparkSession.builder\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.config("spark.sql.shuffle.partitions", "8")\
.master("local[4]").getOrCreate()
@pandas_udf(IntegerType())
def sum_pandas(vals: pd.Series) -> int:
return int(vals.to_numpy().sum())
@pandas_udf(IntegerType())
def sum_pandas2(vals: pd.Series) -> int:
return int(vals.to_numpy().sum())
sum_pandas2 = sum_pandas2.asNondeterministic()
l = [(i%100,i) for i in range(2000)]
data = spark.createDataFrame(l, schema=["col1","col2"])
data.groupby("col1").agg(sum_pandas("col2").alias("sum"))\
.withColumn("group_id", F.monotonically_increasing_id()).show()
data = spark.createDataFrame(l, schema=["col1","col2"])
data.groupby("col1").agg(sum_pandas2("col2").alias("sum"))\
.withColumn("group_id", F.monotonically_increasing_id()).show(){code}
Output
{code:java}
+----+-----+--------+
|col1| sum|group_id|
+----+-----+--------+
| 2|19040| 0|
| 12|19240| 1|
| 26|19520| 2|
| 28|19560| 3|
| 29|19580| 4|
| 30|19600| 5|
| 33|19660| 6|
| 42|19840| 7|
| 48|19960| 8|
| 67|20340| 9|
| 73|20460| 10|
| 88|20760| 11|
| 91|20820| 12|
| 93|20860| 13|
| 9|19180| 0|
| 11|19220| 1|
| 22|19440| 2|
| 32|19640| 3|
| 36|19720| 4|
| 40|19800| 5|
+----+-----+--------+
only showing top 20 rows
+----+-----+----------+
|col1| sum| group_id|
+----+-----+----------+
| 2|19040| 0|
| 12|19240| 1|
| 26|19520| 2|
| 28|19560| 3|
| 29|19580| 4|
| 30|19600| 5|
| 33|19660| 6|
| 42|19840| 7|
| 48|19960| 8|
| 67|20340| 9|
| 73|20460| 10|
| 88|20760| 11|
| 91|20820| 12|
| 93|20860| 13|
| 9|19180|8589934592|
| 11|19220|8589934593|
| 22|19440|8589934594|
| 32|19640|8589934595|
| 36|19720|8589934596|
| 40|19800|8589934597|
+----+-----+----------+
only showing top 20 rows
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]