Omri created SPARK-23883: ---------------------------- Summary: Error with conversion to arrow while using pandas_udf Key: SPARK-23883 URL: https://issues.apache.org/jira/browse/SPARK-23883 Project: Spark Issue Type: Bug Components: PySpark Affects Versions: 2.3.0 Environment: Spark 2.3.0
Python 3.5 Java 1.8.0_161-b12 Reporter: Omri Hi, I have a code that works on DataBricks but doesn't work on a local spark installation. This is the code I'm running: {code:java} from pyspark.sql.functions import pandas_udf import pandas as pd import numpy as np from pyspark.sql.types import * schema = StructType([ StructField("Distance", FloatType()), StructField("CarId", IntegerType()) ]) def haversine(lon1, lat1, lon2, lat2): #Calculate distance, return scalar return 3.5 # Removed logic to facilitate reading @pandas_udf(schema) def totalDistance(oneCar): dist = haversine(oneCar.Longtitude.shift(1), oneCar.Latitude.shift(1), oneCar.loc[1:, 'Longitude'], oneCar.loc[1:, 'Latitude']) return pd.DataFrame({"CarId":oneCar['CarId'].iloc[0],"Distance":np.sum(dist)},index = [0]) ## Calculate the overall distance made by each car distancePerCar= df.groupBy('CarId').apply(totalDistance) {code} I'm getting this exception, about Arrow not able to deal with this input: {noformat} --------------------------------------------------------------------------- TypeError Traceback (most recent call last) C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self) 114 try: --> 115 to_arrow_type(self._returnType_placeholder) 116 except TypeError: C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\types.py in to_arrow_type(dt) 1641 else: -> 1642 raise TypeError("Unsupported type in conversion to Arrow: " + str(dt)) 1643 return arrow_type TypeError: Unsupported type in conversion to Arrow: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) During handling of the above exception, another exception occurred: NotImplementedError Traceback (most recent call last) <ipython-input-35-4f2194cfb998> in <module>() 18 km = 6367 * c 19 return km ---> 20 @pandas_udf("CarId: int, Distance: float") 21 def totalDistance(oneUser): 22 dist = haversine(oneUser.Longtitude.shift(1), oneUser.Latitude.shift(1), C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _create_udf(f, returnType, evalType) 62 udf_obj = UserDefinedFunction( 63 f, returnType=returnType, name=None, evalType=evalType, deterministic=True) ---> 64 return udf_obj._wrapped() 65 66 C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in _wrapped(self) 184 185 wrapper.func = self.func --> 186 wrapper.returnType = self.returnType 187 wrapper.evalType = self.evalType 188 wrapper.deterministic = self.deterministic C:\opt\spark\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\udf.py in returnType(self) 117 raise NotImplementedError( 118 "Invalid returnType with scalar Pandas UDFs: %s is " --> 119 "not supported" % str(self._returnType_placeholder)) 120 elif self.evalType == PythonEvalType.SQL_GROUPED_MAP_PANDAS_UDF: 121 if isinstance(self._returnType_placeholder, StructType): NotImplementedError: Invalid returnType with scalar Pandas UDFs: StructType(List(StructField(CarId,IntegerType,true),StructField(Distance,FloatType,true))) is not supported{noformat} I've also tried changing the schema to {code:java} @pandas_udf("<CarId:int,Distance:float>") {code} and {code:java} @pandas_udf("CarId:int,Distance:float"){code} As mentioned, this is working on a DataBricks instance in Azure, but not locally. -- This message was sent by Atlassian JIRA (v7.6.3#76005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org