[ 
https://issues.apache.org/jira/browse/SPARK-29367?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17026292#comment-17026292
 ] 

Dongjoon Hyun commented on SPARK-29367:
---------------------------------------

Adjusted document is added to `branch-2.4` via 
[https://github.com/apache/spark/pull/27383] .

> pandas udf not working with latest pyarrow release (0.15.0)
> -----------------------------------------------------------
>
>                 Key: SPARK-29367
>                 URL: https://issues.apache.org/jira/browse/SPARK-29367
>             Project: Spark
>          Issue Type: Documentation
>          Components: PySpark
>    Affects Versions: 2.4.0, 2.4.1, 2.4.3
>            Reporter: Julien Peloton
>            Assignee: Bryan Cutler
>            Priority: Major
>             Fix For: 2.4.5, 3.0.0
>
>
> Hi,
> I recently upgraded pyarrow from 0.14 to 0.15 (released on Oct 5th), and my 
> pyspark jobs using pandas udf are failing with 
> java.lang.IllegalArgumentException (tested with Spark 2.4.0, 2.4.1, and 
> 2.4.3). Here is a full example to reproduce the failure with pyarrow 0.15:
> {code:python}
> from pyspark.sql import SparkSession
> from pyspark.sql.functions import pandas_udf, PandasUDFType
> from pyspark.sql.types import BooleanType
> import pandas as pd
> @pandas_udf(BooleanType(), PandasUDFType.SCALAR)
> def qualitycuts(nbad: int, rb: float, magdiff: float) -> pd.Series:
>     """ Apply simple quality cuts
>     Returns
>     ----------
>     out: pandas.Series of booleans
>     Return a Pandas DataFrame with the appropriate flag: false for bad alert,
>         and true for good alert.
>     """
>     mask = nbad.values == 0
>     mask *= rb.values >= 0.55
>     mask *= abs(magdiff.values) <= 0.1
>     return pd.Series(mask)
> spark = SparkSession.builder.getOrCreate()
> # Create dummy DF
> colnames = ["nbad", "rb", "magdiff"]
> df = spark.sparkContext.parallelize(
>     zip(
>         [0, 1, 0, 0],
>         [0.01, 0.02, 0.6, 0.01],
>         [0.02, 0.05, 0.1, 0.01]
>     )
> ).toDF(colnames)
> df.show()
> # Apply cuts
> df = df\
>     .withColumn("toKeep", qualitycuts(*colnames))\
>     .filter("toKeep == true")\
>     .drop("toKeep")
> # This will fail if latest pyarrow 0.15.0 is used
> df.show()
> {code}
> and the log is:
> {code}
> Driver stacktrace:
> 19/10/07 09:37:49 INFO DAGScheduler: Job 3 failed: showString at 
> NativeMethodAccessorImpl.java:0, took 0.660523 s
> Traceback (most recent call last):
>   File 
> "/Users/julien/Documents/workspace/myrepos/fink-broker/test_pyarrow.py", line 
> 44, in <module>
>     df.show()
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/dataframe.py",
>  line 378, in show
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py",
>  line 1257, in __call__
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/pyspark.zip/pyspark/sql/utils.py",
>  line 63, in deco
>   File 
> "/Users/julien/Documents/workspace/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py",
>  line 328, in get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling o64.showString.
> : org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 
> in stage 3.0 failed 1 times, most recent failure: Lost task 0.0 in stage 3.0 
> (TID 5, localhost, executor driver): java.lang.IllegalArgumentException
>       at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
>       at 
> org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
>       at 
> org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
>       at 
> org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
>       at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
>       at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:406)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at 
> org.apache.spark.sql.execution.python.ArrowEvalPythonExec$$anon$2.<init>(ArrowEvalPythonExec.scala:98)
>       at 
> org.apache.spark.sql.execution.python.ArrowEvalPythonExec.evaluate(ArrowEvalPythonExec.scala:96)
>       at 
> org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:127)
>       at 
> org.apache.spark.sql.execution.python.EvalPythonExec$$anonfun$doExecute$1.apply(EvalPythonExec.scala:89)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:801)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:121)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> I am not sure what is the root of this failure, but I note there is a ticket 
> opened (https://issues.apache.org/jira/browse/ARROW-6429) suggesting some 
> work ongoing on the Spark side.
> I guess any user upgrading pyarrow would face the same error right away, and 
> any help or feedback would be appreciated.
> Thanks,
> Julien



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to