[
https://issues.apache.org/jira/browse/SPARK-48311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17853582#comment-17853582
]
Sumit Singh commented on SPARK-48311:
-------------------------------------
I have created the PR based on details explained in details doc.
> Nested pythonUDF in groupBy and aggregate result in Binding Exception
> ----------------------------------------------------------------------
>
> Key: SPARK-48311
> URL: https://issues.apache.org/jira/browse/SPARK-48311
> Project: Spark
> Issue Type: Bug
> Components: PySpark, SQL
> Affects Versions: 3.3.2
> Reporter: Sumit Singh
> Priority: Major
> Labels: pull-request-available
>
> Steps to Reproduce
> 1. Data creation
> {code:java}
> from pyspark.sql import SparkSession
> from pyspark.sql.types import StructType, StructField, LongType,
> TimestampType, StringType
> from datetime import datetime
> # Define the schema
> schema = StructType([
> StructField("col1", LongType(), nullable=True),
> StructField("col2", TimestampType(), nullable=True),
> StructField("col3", StringType(), nullable=True)
> ])
> # Define the data
> data = [
> (1, datetime(2023, 5, 15, 12, 30), "Discount"),
> (2, datetime(2023, 5, 16, 16, 45), "Promotion"),
> (3, datetime(2023, 5, 17, 9, 15), "Coupon")
> ]
> # Create the DataFrame
> df = spark.createDataFrame(data, schema)
> df.createOrReplaceTempView("temp_offers")
> # Query the temporary table using SQL
> # DISTINCT required to reproduce the issue.
> testDf = spark.sql("""
> SELECT DISTINCT
> col1,
> col2,
> col3 FROM temp_offers
> """) {code}
> 2. UDF registration
> {code:java}
> import pyspark.sql.functions as F
> import pyspark.sql.types as T
> #Creating udf functions
> def udf1(d):
> return d
> def udf2(d):
> if d.isoweekday() in (1, 2, 3, 4):
> return 'WEEKDAY'
> else:
> return 'WEEKEND'
> udf1_name = F.udf(udf1, T.TimestampType())
> udf2_name = F.udf(udf2, T.StringType()) {code}
> 3. Adding UDF in grouping and agg
> {code:java}
> groupBy_cols = ['col1', 'col4', 'col5', 'col3']
> temp = testDf \
> .select('*', udf1_name(F.col('col2')).alias('col4')).select('*',
> udf2_name('col4').alias('col5'))
> result =
> (temp.groupBy(*groupBy_cols).agg(F.countDistinct('col5').alias('col6'))){code}
> 4. Result
> {code:java}
> result.show(5, False) {code}
> *We get below error*
> {code:java}
> An error was encountered:
> An error occurred while calling o1079.showString.
> : java.lang.IllegalStateException: Couldn't find pythonUDF0#1108 in
> [col1#978L,groupingPythonUDF#1104,groupingPythonUDF#1105,col3#980,count(pythonUDF0#1108)#1080L]
> at
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:80)
> at
> org.apache.spark.sql.catalyst.expressions.BindReferences$$anonfun$bindReference$1.applyOrElse(BoundAttribute.scala:73)
> {code}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]