Kaushalya created SPARK-25898:
---------------------------------

             Summary: pandas_udf 
                 Key: SPARK-25898
                 URL: https://issues.apache.org/jira/browse/SPARK-25898
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.3.2
            Reporter: Kaushalya


The usage of pandas_udfs do not show the possible errors in implementation at 
first point of invocation. The errors manifest down the line when accessing the 
returned dataframe. The issue is not found in small datasets, only when the 
datasets are distributed across servers. 
{code:java}
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pyspark.sql import SparkSession
from pyspark.sql.functions import pandas_udf,PandasUDFType
from pyspark.sql.types import *

spark = SparkSession.builder.appName("myapp").getOrCreate()

schema = StructType(one_second_df.schema.fields + 
[StructField("gradient_order_rate", DoubleType())])

one_second_df = spark.read.parquet("data")

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def gradient(df):
df["gradient"] = np.gradient(df.val_count)
return df

agg_df = one_second_df.groupby("id").apply(gradient)
agg_df.show(2)

#### THE FOLLOWING LINES THREW ERRORS such as : 
"Shape of array too small to calculate a numerical gradient, "
ValueError: Shape of array too small to calculate a numerical gradient, at 
least (edge_order + 1) elements are required."

fd = agg_df.filter(agg_df["id"] == "a")
fd.show(5)

Weirdly this error is thrown after the second show command which should 
technically have run through the entire dataset but probably doesn't do that. I 
think this should be fixed, as it would be difficult to catch errors that 
manifest down the line if they are not identifiable earlier on. 


{code}
 

Please also find working code for a similar workflow but with a much smaller 
dataset
{code:java}
from pyspark.sql.functions import pandas_udf,PandasUDFType

df3 = spark.createDataFrame(
[("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
("key", "value1", "value2")
)

from pyspark.sql.types import *

schema = StructType([
StructField("key", StringType()),
StructField("avg_value1", DoubleType()),
StructField("avg_value2", DoubleType()),
StructField("sum_avg", DoubleType()),
StructField("sub_avg", DoubleType())
])

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def g(df):
gr = df['key'].iloc[0]
x = df.value1.mean()
y = df.value2.mean()
w = df.value1.mean() + df.value2.mean()
z = df.value1.mean() - df.value2.mean()
return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])

ss = df3.groupby("key").apply(g)
ss.show(5)
my_filter = ss.filter(ss["key"] == 'a')
my_filter.show(2)
{code}
 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to