Hadrien Glaude created SPARK-35745:
--------------------------------------

             Summary: Serie to Scalar pandas_udf in GroupedData.agg() breaks 
the following monotonically_increasing_id()
                 Key: SPARK-35745
                 URL: https://issues.apache.org/jira/browse/SPARK-35745
             Project: Spark
          Issue Type: Bug
          Components: PySpark
    Affects Versions: 3.1.2, 3.1.1
         Environment: I was able to reproduce this with both

pyspark == ' 3.1.1'
 pyarrow == '3.0.0'
Python 3.7.10

and

pyspark == '3.1.2'
 pyarrow == '4.0.1'
Python 3.7.9
            Reporter: Hadrien Glaude


Hello,

I encountered an issue when using a Serie to Scalar `{{panda_udf}}` in 
`{{GroupedData.agg()}}` followed by `{{monotonically_increasing_id()}}`. I 
obtain duplicated ids. Actually, the partition offset in the id seems to be 
zero on all partitions. The problem is avoided by using 
`{{asNondeterministic}}`.

Minimal reproducing example
{code:java}
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import pandas_udf
import pandas as pd
from pyspark.sql.types import IntegerType

spark = SparkSession.builder\
.config("spark.sql.execution.arrow.pyspark.enabled", "true")\
.config("spark.sql.shuffle.partitions", "8")\
.master("local[4]").getOrCreate()

@pandas_udf(IntegerType())
def sum_pandas(vals: pd.Series) -> int:
    return int(vals.to_numpy().sum())

@pandas_udf(IntegerType())
def sum_pandas2(vals: pd.Series) -> int: 
    return int(vals.to_numpy().sum())

sum_pandas2 = sum_pandas2.asNondeterministic()

l = [(i%100,i) for i in range(2000)]

data = spark.createDataFrame(l, schema=["col1","col2"])
data.groupby("col1").agg(sum_pandas("col2").alias("sum"))\
.withColumn("group_id", F.monotonically_increasing_id()).show()

data = spark.createDataFrame(l, schema=["col1","col2"])
data.groupby("col1").agg(sum_pandas2("col2").alias("sum"))\
.withColumn("group_id", F.monotonically_increasing_id()).show(){code}
Output
{code:java}
+----+-----+--------+
|col1|  sum|group_id|
+----+-----+--------+
|   2|19040|       0|
|  12|19240|       1|
|  26|19520|       2|
|  28|19560|       3|
|  29|19580|       4|
|  30|19600|       5|
|  33|19660|       6|
|  42|19840|       7|
|  48|19960|       8|
|  67|20340|       9|
|  73|20460|      10|
|  88|20760|      11|
|  91|20820|      12|
|  93|20860|      13|
|   9|19180|       0|
|  11|19220|       1|
|  22|19440|       2|
|  32|19640|       3|
|  36|19720|       4|
|  40|19800|       5|
+----+-----+--------+
only showing top 20 rows

+----+-----+----------+
|col1|  sum|  group_id|
+----+-----+----------+
|   2|19040|         0|
|  12|19240|         1|
|  26|19520|         2|
|  28|19560|         3|
|  29|19580|         4|
|  30|19600|         5|
|  33|19660|         6|
|  42|19840|         7|
|  48|19960|         8|
|  67|20340|         9|
|  73|20460|        10|
|  88|20760|        11|
|  91|20820|        12|
|  93|20860|        13|
|   9|19180|8589934592|
|  11|19220|8589934593|
|  22|19440|8589934594|
|  32|19640|8589934595|
|  36|19720|8589934596|
|  40|19800|8589934597|
+----+-----+----------+
only showing top 20 rows
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to