[ 
https://issues.apache.org/jira/browse/SPARK-30063?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16986817#comment-16986817
 ] 

Ruben Berenguel commented on SPARK-30063:
-----------------------------------------

Wow, this looks bad for now (since grouped_aggs are one of the "super cool 
things you can do now in Arrow/Pandas"), means we may need some kind of 
regression testing for Arrow "usage" in PySpark or some stronger version 
pinning.

Tagging more knowledgeable people in this area of Spark so they are aware too: 
[~hyukjin.kwon] [~bryanc]

Thanks [~tkellogg] this definitely seems to be enough to get it forward (way 
more than enough, big thanks!)

> Failure when returning a value from multiple Pandas UDFs
> --------------------------------------------------------
>
>                 Key: SPARK-30063
>                 URL: https://issues.apache.org/jira/browse/SPARK-30063
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 2.4.3, 2.4.4
>         Environment: Happens on Mac & Ubuntu (Docker). Seems to happen on 
> both 2.4.3 and 2.4.4
>            Reporter: Tim Kellogg
>            Priority: Major
>         Attachments: spark-debug.txt, variety-of-schemas.ipynb
>
>
> I have 20 Pandas UDFs that I'm trying to evaluate all at the same time.
>  * PandasUDFType.GROUPED_AGG
>  * 3 columns in the input data frame being serialized over Arrow to Python 
> worker. See below for clarification.
>  * All functions take 2 parameters, some combination of the 3 received as 
> Arrow input.
>  * Varying return types, see details below.
> _*I get an IllegalArgumentException on the Scala side of the worker when 
> deserializing from Python.*_
> h2. Exception & Stack Trace
> {code:java}
> 19/11/27 11:38:36 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5)
> java.lang.IllegalArgumentException
>       at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
>       at 
> org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
>       at 
> org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
>       at 
> org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
>       at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
>       at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:123)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> 19/11/27 11:38:36 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, 
> localhost, executor driver): java.lang.IllegalArgumentException
>       at java.nio.ByteBuffer.allocate(ByteBuffer.java:334)
>       at 
> org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543)
>       at 
> org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58)
>       at 
> org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172)
>       at 
> org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65)
>       at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162)
>       at 
> org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122)
>       at 
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410)
>       at 
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
>       at 
> org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>       at 
> org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
>       at 
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
>       at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
>       at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
>       at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
>       at org.apache.spark.scheduler.Task.run(Task.scala:123)
>       at 
> org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
>       at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
>       at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
>       at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
>       at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>       at java.lang.Thread.run(Thread.java:748)
> {code}
> h2. Input Arrow Schema
> I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out 
> schema & message. This is the input, in load_stream, the code is print(batch, 
> batch.schema, file=log_file)
> {code:java}
> <pyarrow.lib.RecordBatch object at 0x10640ecc8> 
> _0: double
> _1: double
> _2: double
> metadata
> --------
> OrderedDict()
> {code}
> h2. Output Arrow Schema
> I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out 
> schema & message. This is the output, in dump_stream, the code is 
> print(batch, batch.schema, file=log_file)
> {code:java}
> <pyarrow.lib.RecordBatch object at 0x11ad5b638> _0: float
> _1: float
> _2: float
> _3: int32
> _4: int32
> _5: int32
> _6: int32
> _7: int32
> _8: float
> _9: float
> _10: int32
> _11: list<item: float>
>   child 0, item: float
> _12: list<item: float>
>   child 0, item: float
> _13: float
> _14: float
> _15: int32
> _16: float
> _17: list<item: float>
>   child 0, item: float
> _18: list<item: float>
>   child 0, item: float
> _19: float
> {code}
> h2. Arrow Message
> I edited ArrowPythonReader.scala at line 163 to print out the Arrow message.
> Debug code:
> {code:java}
> val fw = new java.io.FileWriter("spark-debug.txt", true)
> try {
>   val buf = new Array[Byte](40000)
>   stream.read(buf)
>   fw.write(s"Spark reader\n")
>   for (b <- buf) {
>     fw.write(String.format("%02x", Byte.box(b)))
>   }
>   fw.write(s"\n")
> } finally fw.close()
> {code}
> Debug output (some trailing 0's included for completeness).
> {code:java}
> ffffffff900400001000000000000a000c000600050008000a000000000103000c000000080008000000040008000000040000001400000030040000f4030000c803000090030000600300003003000000030000d0020000a40200007802000048020000ec01000094010000680100003c0100000c010000e000000088000000300000000400000030fcffff00000103180000000c00000004000000000000001efcffff00000100030000005f31390058fcffff0000010c440000001000000004000000010000000800000060feffff78fcffff00000103180000000c000000040000000000000066fcffff00000100040000006974656d00000000030000005f313800acfcffff0000010c4400000010000000040000000100000008000000b4feffffccfcffff00000103180000000c0000000400000000000000bafcffff00000100040000006974656d00000000030000005f31370000fdffff00000103180000000c0000000400000000000000eefcffff00000100030000005f31360028fdffff000001021c0000000c0000000400000000000000acfdffff0000000120000000030000005f31350054fdffff00000103180000000c000000040000000000000042fdffff00000100030000005f3134007cfdffff00000103180000000c00000004000000000000006afdffff00000100030000005f313300a4fdffff0000010c4400000010000000040000000100000008000000acffffffc4fdffff00000103180000000c0000000400000000000000b2fdffff00000100040000006974656d00000000030000005f313200f8fdffff0000010c480000001400000004000000010000000c00000004000400040000001cfeffff00000103180000000c00000004000000000000000afeffff00000100040000006974656d00000000030000005f31310050feffff000001021c0000000c0000000400000000000000d4feffff0000000120000000030000005f3130007cfeffff00000103180000000c00000004000000000000006afeffff00000100020000005f390000a4feffff00000103180000000c000000040000000000000092feffff00000100020000005f380000ccfeffff000001021c0000000c000000040000000000000050ffffff0000000120000000020000005f370000f8feffff000001021c0000000c00000004000000000000007cffffff0000000120000000020000005f36000024ffffff000001021c0000000c0000000400000000000000a8ffffff0000000120000000020000005f35000050ffffff000001021c0000000c0000000400000000000000d4ffffff0000000120000000020000005f3400007cffffff000001022400000014000000040000000000000008000c0008000700080000000000000120000000020000005f330000b0ffffff00000103180000000c00000004000000000000009effffff00000100020000005f320000d8ffffff00000103180000000c0000000400000000000000c6ffffff00000100020000005f310000100014000800060007000c000000100010000000000001032000000014000000040000000000000000000600080006000600000000000100020000005f300000ffffffffd804000014000000000000000c0016000600050008000c000c0000000003030018000000e00000000000000000000a0018000c00040008000a0000001c0300001000000001000000000000000000000030000000000000000000000000000000000000000000000000000000080000000000000008000000000000000000000000000000080000000000000008000000000000001000000000000000000000000000000010000000000000000800000000000000180000000000000000000000000000001800000000000000080000000000000020000000000000000000000000000000200000000000000008000000000000002800000000000000000000000000000028000000000000000800000000000000300000000000000000000000000000003000000000000000080000000000000038000000000000000000000000000000380000000000000008000000000000004000000000000000000000000000000040000000000000000800000000000000480000000000000000000000000000004800000000000000080000000000000050000000000000000000000000000000500000000000000008000000000000005800000000000000000000000000000058000000000000000800000000000000600000000000000000000000000000006000000000000000100000000000000070000000000000000000000000000000700000000000000008000000000000007800000000000000000000000000000078000000000000001000000000000000880000000000000000000000000000008800000000000000080000000000000090000000000000000000000000000000900000000000000008000000000000009800000000000000000000000000000098000000000000000800000000000000a0000000000000000000000000000000a0000000000000000800000000000000a8000000000000000000000000000000a8000000000000000800000000000000b0000000000000000000000000000000b0000000000000001000000000000000c0000000000000000000000000000000c0000000000000000800000000000000c8000000000000000000000000000000c8000000000000001000000000000000d8000000000000000000000000000000d800000000000000080000000000000000000000180000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000300000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000003000000000000000000000000000000010000000000000000000000000000000000003f000000000000203f000000000000003f00000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000abaa2a3f00000000320000000000000000000000030000000000803f0000003f000000000000000000000000030000000000003f0000003f0000803f0000000000000000000000000000003f00000000200000000000000000000000000000000000000003000000000000000000003f0000803f000000000000000003000000000000000000003f0000803f000000000000803f00000000ffffffff00000000fffffffd0000016eae25ec840000016eae25f77b0000016eae25f7ac00000000000000000000000000000000ffffffff00000000fffffffc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000
> {code}
>  
> h2. Query Plan at Point of Failure
> Right before the failure, I printed out the explain(True) output.
>  
> {code:java}
> == Parsed Logical Plan ==
> 'Project [structstojson(named_struct(), None) AS key#269, 
> unresolvedalias('accuracy, None), unresolvedalias('areaUnderPR, None), 
> unresolvedalias('areaUnderROC, None), unresolvedalias('confusionMatrix, 
> None), unresolvedalias('count, None), unresolvedalias('f1Score, None), 
> unresolvedalias('f1Score_0, None), unresolvedalias('positiveClassRate, None), 
> unresolvedalias('prCurve, None), unresolvedalias('precision, None), 
> unresolvedalias('precision_0, None), unresolvedalias('predictionRate, None), 
> unresolvedalias('recall, None), unresolvedalias('rocCurve, None), 
> unresolvedalias('specificity, None)]
> +- Aggregate [udf(cast(label#146 as double), cast(prediction#15 as double)) 
> AS accuracy#232, _auc_pr(cast(label#146 as double), cast(probability#16 as 
> double)) AS areaUnderPR#233, udf(cast(label#146 as double), 
> cast(probability#16 as double)) AS areaUnderROC#225, 
> array(array(udf(cast(label#146 as double), cast(prediction#15 as double)), 
> udf(cast(label#146 as double), cast(prediction#15 as double))), 
> array(udf(cast(label#146 as double), cast(prediction#15 as double)), 
> udf(cast(label#146 as double), cast(prediction#15 as double)))) AS 
> confusionMatrix#238, _count(cast(label#146 as double)) AS count#234, 
> udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score#231, 
> udf(cast(label#146 as double), cast(prediction#15 as double)) AS 
> f1Score_0#228, _rate(cast(label#146 as double)) AS positiveClassRate#227, 
> named_struct(x, udf(cast(label#146 as double), cast(probability#16 as 
> double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) 
> AS prCurve#230, udf(cast(label#146 as double), cast(prediction#15 as double)) 
> AS precision#236, udf(cast(label#146 as double), cast(prediction#15 as 
> double)) AS precision_0#235, _rate(cast(prediction#15 as double)) AS 
> predictionRate#237, udf(cast(label#146 as double), cast(prediction#15 as 
> double)) AS recall#229, named_struct(x, udf(cast(label#146 as double), 
> cast(probability#16 as double)), y, udf(cast(label#146 as double), 
> cast(probability#16 as double))) AS rocCurve#239, udf(cast(label#146 as 
> double), cast(prediction#15 as double)) AS specificity#226]
>    +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, 
> probability#16, model_id#23, label#146, test AS customer#156, foo AS 
> solution#157, bar AS insight#158, model AS model_name#159, 1.0 AS 
> version#160, model1 AS model_id#161, current_timestamp() AS timestamp#162]
>       +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, 
> probability#16, model_id#23, label#146]
>          +- Join Inner, (encounterID#13 = encounterID#145)
>             :- Project [dim1#11, dim2#12, encounterID#13, prediction#15, 
> probability#16, model_id#23]
>             :  +- Filter ((false || NOT test#40) = false)
>             :     +- Project [dim1#11, dim2#12, encounterID#13, 
> prediction#15, probability#16, model_id#23, (true && (dim1#11 = foo)) AS 
> test#40]
>             :        +- Project [dim1#11, dim2#12, encounterID#13, 
> prediction#15, probability#16, model_id#23]
>             :           +- Join Cross
>             :              :- LogicalRDD [dim1#11, dim2#12, encounterID#13, 
> label#14, prediction#15, probability#16], false
>             :              +- LogicalRDD [model_id#23], false
>             +- Project [encounterID#145, label#146]
>                +- Join Cross
>                   :- LogicalRDD [dim1#143, dim2#144, encounterID#145, 
> label#146, prediction#147, probability#148], false
>                   +- LogicalRDD [model_id#23], false== Analyzed Logical Plan 
> ==
> key: string, accuracy: float, areaUnderPR: float, areaUnderROC: float, 
> confusionMatrix: array<array<int>>, count: int, f1Score: float, f1Score_0: 
> float, positiveClassRate: int, prCurve: 
> struct<x:array<float>,y:array<float>>, precision: float, precision_0: float, 
> predictionRate: int, recall: float, rocCurve: 
> struct<x:array<float>,y:array<float>>, specificity: float
> Project [structstojson(named_struct(), Some(America/Los_Angeles)) AS key#269, 
> accuracy#232, areaUnderPR#233, areaUnderROC#225, confusionMatrix#238, 
> count#234, f1Score#231, f1Score_0#228, positiveClassRate#227, prCurve#230, 
> precision#236, precision_0#235, predictionRate#237, recall#229, rocCurve#239, 
> specificity#226]
> +- Aggregate [udf(cast(label#146 as double), cast(prediction#15 as double)) 
> AS accuracy#232, _auc_pr(cast(label#146 as double), cast(probability#16 as 
> double)) AS areaUnderPR#233, udf(cast(label#146 as double), 
> cast(probability#16 as double)) AS areaUnderROC#225, 
> array(array(udf(cast(label#146 as double), cast(prediction#15 as double)), 
> udf(cast(label#146 as double), cast(prediction#15 as double))), 
> array(udf(cast(label#146 as double), cast(prediction#15 as double)), 
> udf(cast(label#146 as double), cast(prediction#15 as double)))) AS 
> confusionMatrix#238, _count(cast(label#146 as double)) AS count#234, 
> udf(cast(label#146 as double), cast(prediction#15 as double)) AS f1Score#231, 
> udf(cast(label#146 as double), cast(prediction#15 as double)) AS 
> f1Score_0#228, _rate(cast(label#146 as double)) AS positiveClassRate#227, 
> named_struct(x, udf(cast(label#146 as double), cast(probability#16 as 
> double)), y, udf(cast(label#146 as double), cast(probability#16 as double))) 
> AS prCurve#230, udf(cast(label#146 as double), cast(prediction#15 as double)) 
> AS precision#236, udf(cast(label#146 as double), cast(prediction#15 as 
> double)) AS precision_0#235, _rate(cast(prediction#15 as double)) AS 
> predictionRate#237, udf(cast(label#146 as double), cast(prediction#15 as 
> double)) AS recall#229, named_struct(x, udf(cast(label#146 as double), 
> cast(probability#16 as double)), y, udf(cast(label#146 as double), 
> cast(probability#16 as double))) AS rocCurve#239, udf(cast(label#146 as 
> double), cast(prediction#15 as double)) AS specificity#226]
>    +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, 
> probability#16, model_id#23, label#146, test AS customer#156, foo AS 
> solution#157, bar AS insight#158, model AS model_name#159, 1.0 AS 
> version#160, model1 AS model_id#161, current_timestamp() AS timestamp#162]
>       +- Project [encounterID#13, dim1#11, dim2#12, prediction#15, 
> probability#16, model_id#23, label#146]
>          +- Join Inner, (encounterID#13 = encounterID#145)
>             :- Project [dim1#11, dim2#12, encounterID#13, prediction#15, 
> probability#16, model_id#23]
>             :  +- Filter ((false || NOT test#40) = false)
>             :     +- Project [dim1#11, dim2#12, encounterID#13, 
> prediction#15, probability#16, model_id#23, (true && (dim1#11 = foo)) AS 
> test#40]
>             :        +- Project [dim1#11, dim2#12, encounterID#13, 
> prediction#15, probability#16, model_id#23]
>             :           +- Join Cross
>             :              :- LogicalRDD [dim1#11, dim2#12, encounterID#13, 
> label#14, prediction#15, probability#16], false
>             :              +- LogicalRDD [model_id#23], false
>             +- Project [encounterID#145, label#146]
>                +- Join Cross
>                   :- LogicalRDD [dim1#143, dim2#144, encounterID#145, 
> label#146, prediction#147, probability#148], false
>                   +- LogicalRDD [model_id#23], false== Optimized Logical Plan 
> ==
> Aggregate [{} AS key#269, udf(label#146, prediction#15) AS accuracy#232, 
> _auc_pr(label#146, probability#16) AS areaUnderPR#233, udf(label#146, 
> probability#16) AS areaUnderROC#225, array(array(udf(label#146, 
> prediction#15), udf(label#146, prediction#15)), array(udf(label#146, 
> prediction#15), udf(label#146, prediction#15))) AS confusionMatrix#238, 
> _count(label#146) AS count#234, udf(label#146, prediction#15) AS f1Score#231, 
> udf(label#146, prediction#15) AS f1Score_0#228, _rate(label#146) AS 
> positiveClassRate#227, named_struct(x, udf(label#146, probability#16), y, 
> udf(label#146, probability#16)) AS prCurve#230, udf(label#146, prediction#15) 
> AS precision#236, udf(label#146, prediction#15) AS precision_0#235, 
> _rate(prediction#15) AS predictionRate#237, udf(label#146, prediction#15) AS 
> recall#229, named_struct(x, udf(label#146, probability#16), y, udf(label#146, 
> probability#16)) AS rocCurve#239, udf(label#146, prediction#15) AS 
> specificity#226]
> +- Project [prediction#15, probability#16, label#146]
>    +- Join Inner, (encounterID#13 = encounterID#145)
>       :- Project [encounterID#13, prediction#15, probability#16]
>       :  +- Filter ((isnotnull(test#40) && (NOT test#40 = false)) && 
> isnotnull(encounterID#13))
>       :     +- InMemoryRelation [dim1#11, dim2#12, encounterID#13, 
> prediction#15, probability#16, model_id#23, test#40], StorageLevel(disk, 
> memory, deserialized, 1 replicas)
>       :           +- *(2) Project [dim1#11, dim2#12, encounterID#13, 
> prediction#15, probability#16, model_id#23, (dim1#11 = foo) AS test#40]
>       :              +- CartesianProduct
>       :                 :- *(1) Project [dim1#11, dim2#12, encounterID#13, 
> prediction#15, probability#16]
>       :                 :  +- Scan 
> ExistingRDD[dim1#11,dim2#12,encounterID#13,label#14,prediction#15,probability#16]
>       :                 +- Scan ExistingRDD[model_id#23]
>       +- Join Cross
>          :- Project [encounterID#145, label#146]
>          :  +- Filter isnotnull(encounterID#145)
>          :     +- LogicalRDD [dim1#143, dim2#144, encounterID#145, label#146, 
> prediction#147, probability#148], false
>          +- Project
>             +- LogicalRDD [model_id#23], false== Physical Plan ==
> !AggregateInPandas [udf(label#146, prediction#15), _auc_pr(label#146, 
> probability#16), udf(label#146, probability#16), udf(label#146, 
> prediction#15), udf(label#146, prediction#15), udf(label#146, prediction#15), 
> udf(label#146, prediction#15), _count(label#146), udf(label#146, 
> prediction#15), udf(label#146, prediction#15), _rate(label#146), 
> udf(label#146, probability#16), udf(label#146, probability#16), 
> udf(label#146, prediction#15), udf(label#146, prediction#15), 
> _rate(prediction#15), udf(label#146, prediction#15), udf(label#146, 
> probability#16), udf(label#146, probability#16), udf(label#146, 
> prediction#15)], [{} AS key#269, udf(label, prediction)#201 AS accuracy#232, 
> _auc_pr(label, probability)#209 AS areaUnderPR#233, udf(label, 
> probability)#208 AS areaUnderROC#225, array(array(udf(label, prediction)#213, 
> udf(label, prediction)#214), array(udf(label, prediction)#215, udf(label, 
> prediction)#216)) AS confusionMatrix#238, _count(label)#210 AS count#234, 
> udf(label, prediction)#206 AS f1Score#231, udf(label, prediction)#207 AS 
> f1Score_0#228, _rate(label)#212 AS positiveClassRate#227, named_struct(x, 
> udf(label, probability)#219, y, udf(label, probability)#220) AS prCurve#230, 
> udf(label, prediction)#202 AS precision#236, udf(label, prediction)#203 AS 
> precision_0#235, _rate(prediction)#211 AS predictionRate#237, udf(label, 
> prediction)#204 AS recall#229, named_struct(x, udf(label, probability)#217, 
> y, udf(label, probability)#218) AS rocCurve#239, udf(label, prediction)#205 
> AS specificity#226]
> +- Exchange SinglePartition
>    +- *(4) Project [prediction#15, probability#16, label#146]
>       +- *(4) BroadcastHashJoin [encounterID#13], [encounterID#145], Inner, 
> BuildLeft
>          :- BroadcastExchange HashedRelationBroadcastMode(List(input[0, 
> string, true]))
>          :  +- *(1) Project [encounterID#13, prediction#15, probability#16]
>          :     +- *(1) Filter ((isnotnull(test#40) && (NOT test#40 = false)) 
> && isnotnull(encounterID#13))
>          :        +- InMemoryTableScan [encounterID#13, prediction#15, 
> probability#16, test#40], [isnotnull(test#40), (NOT test#40 = false), 
> isnotnull(encounterID#13)]
>          :              +- InMemoryRelation [dim1#11, dim2#12, 
> encounterID#13, prediction#15, probability#16, model_id#23, test#40], 
> StorageLevel(disk, memory, deserialized, 1 replicas)
>          :                    +- *(2) Project [dim1#11, dim2#12, 
> encounterID#13, prediction#15, probability#16, model_id#23, (dim1#11 = foo) 
> AS test#40]
>          :                       +- CartesianProduct
>          :                          :- *(1) Project [dim1#11, dim2#12, 
> encounterID#13, prediction#15, probability#16]
>          :                          :  +- Scan 
> ExistingRDD[dim1#11,dim2#12,encounterID#13,label#14,prediction#15,probability#16]
>          :                          +- Scan ExistingRDD[model_id#23]
>          +- CartesianProduct
>             :- *(2) Project [encounterID#145, label#146]
>             :  +- *(2) Filter isnotnull(encounterID#145)
>             :     +- Scan 
> ExistingRDD[dim1#143,dim2#144,encounterID#145,label#146,prediction#147,probability#148]
>             +- *(3) Project
>                +- Scan ExistingRDD[model_id#23]
> {code}
> h2. Related Bugs
> I have a related bug that I've gotten where the schema in the input Arrow 
> message was transmiitted incorrectly. In that case, the input schema should 
> have been <long, float, long> but was transmitted as <long, long, float>. As 
> a result, the float column was interpreted as a long (equivalent C code to 
> illustrate behavior: )
> {code:java}
> long reinterpret(double floating_point_number) {
>   return *(long*)(&floating_point_number)
> }
> {code}
> I got around this bug by making all 3 columns float and converting them to 
> long within the UDF via Pandas Series.apply(np.int). Strangely, a 
> Column.astype('float') didn't seem to have an effect, I had to make them 
> float at the source.
> Along the way, I had trouble with [Python's dict keys being 
> non-deterministic|[https://stackoverflow.com/questions/14956313/why-is-dictionary-ordering-non-deterministic].]
>  This led columns being passed to GroupedData.agg() in different orders for 
> each worker and driver process. I've mitigated this by explicitly ordering 
> the columns before sending them to agg. I don't think this is an issue 
> anymore, but I'm calling it out just in case.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to