[
https://issues.apache.org/jira/browse/SPARK-37449?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17448539#comment-17448539
]
Carlos Gameiro edited comment on SPARK-37449 at 11/24/21, 10:49 AM:
--------------------------------------------------------------------
Sometimes there is no natural way to group a dataframe in even partitions
before using a Pandas UDF (applyInPandas). This is useful for use cases that
require applying vectorized functions to arbitrary chunks of the dataset.
In these situations the best workaround I found is grouping the dataframe
through the partition id "spark_partition_id". This is similar to having a
"mapPartitions" operation in spark dataframes.
Are there any plans to include this feature for PySpark dataframes?
was (Author: JIRAUSER280673):
Sometimes there is no natural way to group a dataframe in even partitions
before using a Pandas UDF (applyInPandas). This is useful for use cases that
require applying vectorized fucntions.
In these situations the best workaround I found is grouping the dataframe
through the partition id "spark_partition_id". This is similar to having a
"mapPartitions" operation in spark dataframes.
Are there any plans to include this feature for PySpark dataframes?
> Side effects between PySpark Pandas UDF and Numpy indexing
> ----------------------------------------------------------
>
> Key: SPARK-37449
> URL: https://issues.apache.org/jira/browse/SPARK-37449
> Project: Spark
> Issue Type: Bug
> Components: PySpark
> Affects Versions: 3.1.2
> Reporter: Carlos Gameiro
> Priority: Major
> Labels: NumPy, Pandas, Pygeos, UDF, applyInPandas
>
> Let's create a simple Pandas Dataframe with a single column named 'id' that
> contains a sequential range.
> {code:java}
> df = pd.DataFrame(np.arange(0,1000), columns=['id']){code}
> Consider this function that selects the first 4 indexes of the 'id' column of
> an array.
> {code:java}
> def udf_example(df):
>
> some_index = np.array([0, 1, 2, 3])
> values = df['id'].values[some_index]
>
> df = pd.DataFrame(values, columns=['id'])
> return df{code}
> If I apply this function in Pyspark I get this result:
> {code:java}
> schema = t.StructType([t.StructField('id', t.LongType(), True)])
> df_spark = spark.createDataFrame(df).groupBy().applyInPandas(udf_example,
> schema)
> display(df_spark)
> # id
> # 125
> # 126
> # 127
> # 128
> {code}
> If I apply it in Python I get the correct and expected result:
> {code:java}
> udf_example(df)
> # id
> # 0
> # 1
> # 2
> # 3
> {code}
> Using NumPy indexing operations inside a Pandas UDF in Spark causes side
> effects and unexpected results.
--
This message was sent by Atlassian Jira
(v8.20.1#820001)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]