[ 
https://issues.apache.org/jira/browse/SPARK-25898?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kaushalya updated SPARK-25898:
------------------------------
    Summary: pandas_udf not catching errors early on  (was: pandas_udf )

> pandas_udf not catching errors early on
> ---------------------------------------
>
>                 Key: SPARK-25898
>                 URL: https://issues.apache.org/jira/browse/SPARK-25898
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.3.2
>            Reporter: Kaushalya
>            Priority: Major
>
> The usage of pandas_udfs do not show the possible errors in implementation at 
> first point of invocation. The errors manifest down the line when accessing 
> the returned dataframe. The issue is not found in small datasets, only when 
> the datasets are distributed across servers. 
> {code:java}
> import pandas as pd
> import numpy as np
> import matplotlib.pyplot as plt
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import pandas_udf,PandasUDFType
> from pyspark.sql.types import *
> spark = SparkSession.builder.appName("myapp").getOrCreate()
> schema = StructType(one_second_df.schema.fields + 
> [StructField("gradient_order_rate", DoubleType())])
> one_second_df = spark.read.parquet("data")
> @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
> def gradient(df):
> df["gradient"] = np.gradient(df.val_count)
> return df
> agg_df = one_second_df.groupby("id").apply(gradient)
> agg_df.show(2)
> #### THE FOLLOWING LINES THREW ERRORS such as : 
> "Shape of array too small to calculate a numerical gradient, "
> ValueError: Shape of array too small to calculate a numerical gradient, at 
> least (edge_order + 1) elements are required."
> fd = agg_df.filter(agg_df["id"] == "a")
> fd.show(5)
> Weirdly this error is thrown after the second show command which should 
> technically have run through the entire dataset but probably doesn't do that. 
> I think this should be fixed, as it would be difficult to catch errors that 
> manifest down the line if they are not identifiable earlier on. 
> {code}
>  
> Please also find working code for a similar workflow but with a much smaller 
> dataset
> {code:java}
> from pyspark.sql.functions import pandas_udf,PandasUDFType
> df3 = spark.createDataFrame(
> [("a", 1, 0), ("a", -1, 42), ("b", 3, -1), ("b", 10, -2)],
> ("key", "value1", "value2")
> )
> from pyspark.sql.types import *
> schema = StructType([
> StructField("key", StringType()),
> StructField("avg_value1", DoubleType()),
> StructField("avg_value2", DoubleType()),
> StructField("sum_avg", DoubleType()),
> StructField("sub_avg", DoubleType())
> ])
> @pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
> def g(df):
> gr = df['key'].iloc[0]
> x = df.value1.mean()
> y = df.value2.mean()
> w = df.value1.mean() + df.value2.mean()
> z = df.value1.mean() - df.value2.mean()
> return pd.DataFrame([[gr]+[x]+[y]+[w]+[z]])
> ss = df3.groupby("key").apply(g)
> ss.show(5)
> my_filter = ss.filter(ss["key"] == 'a')
> my_filter.show(2)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to