[ https://issues.apache.org/jira/browse/SPARK-30063?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Tim Kellogg updated SPARK-30063: -------------------------------- Description: I have 20 Pandas UDFs that I'm trying to evaluate all at the same time. * PandasUDFType.GROUPED_AGG * 3 columns in the input data frame being serialized over Arrow to Python worker. See below for clarification. * All functions take 2 parameters, some combination of the 3 received as Arrow input. * Varying return types, see details below. _*I get an IllegalArgumentException on the Scala side of the worker when deserializing from Python.*_ h2. Exception & Stack Trace {code:java} 19/11/27 11:38:36 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5) java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/11/27 11:38:36 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} h2. Input Arrow Schema I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the input, in load_stream, the code is print(batch, batch.schema, file=log_file) {code:java} <pyarrow.lib.RecordBatch object at 0x10640ecc8> _0: double _1: double _2: double metadata -------- OrderedDict() {code} h2. Output Arrow Schema I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the output, in dump_stream, the code is print(batch, batch.schema, file=log_file) {code:java} <pyarrow.lib.RecordBatch object at 0x11ad5b638> _0: float _1: float _2: float _3: int32 _4: int32 _5: int32 _6: int32 _7: int32 _8: float _9: float _10: int32 _11: list<item: float> child 0, item: float _12: list<item: float> child 0, item: float _13: float _14: float _15: int32 _16: float _17: list<item: float> child 0, item: float _18: list<item: float> child 0, item: float _19: float {code} h2. Arrow Message I edited ArrowPythonReader.scala at line 163 to print out the Arrow message. Debug code: {code:java} val fw = new java.io.FileWriter("spark-debug.txt", true) try { val buf = new Array[Byte](40000) stream.read(buf) fw.write(s"Spark reader\n") for (b <- buf) { fw.write(String.format("%02x", Byte.box(b))) } fw.write(s"\n") } finally fw.close() {code} Debug output (some trailing 0's included for completeness). {code:java} ffffffff900400001000000000000a000c000600050008000a000000000103000c000000080008000000040008000000040000001400000030040000f4030000c803000090030000600300003003000000030000d0020000a40200007802000048020000ec01000094010000680100003c0100000c010000e000000088000000300000000400000030fcffff00000103180000000c00000004000000000000001efcffff00000100030000005f31390058fcffff0000010c440000001000000004000000010000000800000060feffff78fcffff00000103180000000c000000040000000000000066fcffff00000100040000006974656d00000000030000005f313800acfcffff0000010c4400000010000000040000000100000008000000b4feffffccfcffff00000103180000000c0000000400000000000000bafcffff00000100040000006974656d00000000030000005f31370000fdffff00000103180000000c0000000400000000000000eefcffff00000100030000005f31360028fdffff000001021c0000000c0000000400000000000000acfdffff0000000120000000030000005f31350054fdffff00000103180000000c000000040000000000000042fdffff00000100030000005f3134007cfdffff00000103180000000c00000004000000000000006afdffff00000100030000005f313300a4fdffff0000010c4400000010000000040000000100000008000000acffffffc4fdffff00000103180000000c0000000400000000000000b2fdffff00000100040000006974656d00000000030000005f313200f8fdffff0000010c480000001400000004000000010000000c00000004000400040000001cfeffff00000103180000000c00000004000000000000000afeffff00000100040000006974656d00000000030000005f31310050feffff000001021c0000000c0000000400000000000000d4feffff0000000120000000030000005f3130007cfeffff00000103180000000c00000004000000000000006afeffff00000100020000005f390000a4feffff00000103180000000c000000040000000000000092feffff00000100020000005f380000ccfeffff000001021c0000000c000000040000000000000050ffffff0000000120000000020000005f370000f8feffff000001021c0000000c00000004000000000000007cffffff0000000120000000020000005f36000024ffffff000001021c0000000c0000000400000000000000a8ffffff0000000120000000020000005f35000050ffffff000001021c0000000c0000000400000000000000d4ffffff0000000120000000020000005f3400007cffffff000001022400000014000000040000000000000008000c0008000700080000000000000120000000020000005f330000b0ffffff00000103180000000c00000004000000000000009effffff00000100020000005f320000d8ffffff00000103180000000c0000000400000000000000c6ffffff00000100020000005f310000100014000800060007000c000000100010000000000001032000000014000000040000000000000000000600080006000600000000000100020000005f300000ffffffffd804000014000000000000000c0016000600050008000c000c0000000003030018000000e00000000000000000000a0018000c00040008000a0000001c0300001000000001000000000000000000000030000000000000000000000000000000000000000000000000000000080000000000000008000000000000000000000000000000080000000000000008000000000000001000000000000000000000000000000010000000000000000800000000000000180000000000000000000000000000001800000000000000080000000000000020000000000000000000000000000000200000000000000008000000000000002800000000000000000000000000000028000000000000000800000000000000300000000000000000000000000000003000000000000000080000000000000038000000000000000000000000000000380000000000000008000000000000004000000000000000000000000000000040000000000000000800000000000000480000000000000000000000000000004800000000000000080000000000000050000000000000000000000000000000500000000000000008000000000000005800000000000000000000000000000058000000000000000800000000000000600000000000000000000000000000006000000000000000100000000000000070000000000000000000000000000000700000000000000008000000000000007800000000000000000000000000000078000000000000001000000000000000880000000000000000000000000000008800000000000000080000000000000090000000000000000000000000000000900000000000000008000000000000009800000000000000000000000000000098000000000000000800000000000000a0000000000000000000000000000000a0000000000000000800000000000000a8000000000000000000000000000000a8000000000000000800000000000000b0000000000000000000000000000000b0000000000000001000000000000000c0000000000000000000000000000000c0000000000000000800000000000000c8000000000000000000000000000000c8000000000000001000000000000000d8000000000000000000000000000000d800000000000000080000000000000000000000180000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000300000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000003000000000000000000000000000000010000000000000000000000000000000000003f000000000000203f000000000000003f00000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000abaa2a3f00000000320000000000000000000000030000000000803f0000003f000000000000000000000000030000000000003f0000003f0000803f0000000000000000000000000000003f00000000200000000000000000000000000000000000000003000000000000000000003f0000803f000000000000000003000000000000000000003f0000803f000000000000803f00000000ffffffff00000000fffffffd0000016eae25ec840000016eae25f77b0000016eae25f7ac00000000000000000000000000000000ffffffff00000000fffffffc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 {code} h2. Related Bugs I have a related bug that I've gotten where the schema in the input Arrow message was transmiitted incorrectly. In that case, the input schema should have been <long, float, long> but was transmitted as <long, long, float>. As a result, the float column was interpreted as a long (equivalent C code to illustrate behavior: ) {code:java} long reinterpret(double floating_point_number) { return *(long*)(&floating_point_number) } {code} I got around this bug by making all 3 columns float and converting them to long within the UDF via Pandas Series.apply(np.int). Strangely, a Column.astype('float') didn't seem to have an effect, I had to make them float at the source. Along the way, I had trouble with [Python's dict keys being non-deterministic|[https://stackoverflow.com/questions/14956313/why-is-dictionary-ordering-non-deterministic].] This led columns being passed to GroupedData.agg() in different orders for each worker and driver process. I've mitigated this by explicitly ordering the columns before sending them to agg. I don't think this is an issue anymore, but I'm calling it out just in case. was: I have 20 Pandas UDFs that I'm trying to evaluate all at the same time. * PandasUDFType.GROUPED_AGG * 3 columns in the input data frame being serialized over Arrow to Python worker. See below for clarification. * All functions take 2 parameters, some combination of the 3 received as Arrow input. * Varying return types, see details below. _*I get an IllegalArgumentException on the Scala side of the worker when deserializing from Python.*_ h2. Exception & Stack Trace {code:java} 19/11/27 11:38:36 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5) java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/11/27 11:38:36 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, localhost, executor driver): java.lang.IllegalArgumentException at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) at org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) at org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) at org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) at org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) at org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) at org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) at org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:123) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) {code} h2. Input Arrow Schema I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the input, in load_stream, the code is print(batch, batch.schema, file=log_file) {code:java} <pyarrow.lib.RecordBatch object at 0x10640ecc8> _0: double _1: double _2: double metadata -------- OrderedDict() {code} h2. Output Arrow Schema I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out schema & message. This is the output, in dump_stream, the code is print(batch, batch.schema, file=log_file) {code:java} <pyarrow.lib.RecordBatch object at 0x11ad5b638> _0: float _1: float _2: float _3: int32 _4: int32 _5: int32 _6: int32 _7: int32 _8: float _9: float _10: int32 _11: list<item: float> child 0, item: float _12: list<item: float> child 0, item: float _13: float _14: float _15: int32 _16: float _17: list<item: float> child 0, item: float _18: list<item: float> child 0, item: float _19: float {code} h2. Arrow Message I edited ArrowPythonReader.scala at line 163 to print out the Arrow message. Debug code: {code:java} val fw = new java.io.FileWriter("spark-debug.txt", true) try { val buf = new Array[Byte](40000) stream.read(buf) fw.write(s"Spark reader\n") for (b <- buf) { fw.write(String.format("%02x", Byte.box(b))) } fw.write(s"\n") } finally fw.close() {code} Debug output (some trailing 0's included for completeness). {code:java} ffffffff900400001000000000000a000c000600050008000a000000000103000c000000080008000000040008000000040000001400000030040000f4030000c803000090030000600300003003000000030000d0020000a40200007802000048020000ec01000094010000680100003c0100000c010000e000000088000000300000000400000030fcffff00000103180000000c00000004000000000000001efcffff00000100030000005f31390058fcffff0000010c440000001000000004000000010000000800000060feffff78fcffff00000103180000000c000000040000000000000066fcffff00000100040000006974656d00000000030000005f313800acfcffff0000010c4400000010000000040000000100000008000000b4feffffccfcffff00000103180000000c0000000400000000000000bafcffff00000100040000006974656d00000000030000005f31370000fdffff00000103180000000c0000000400000000000000eefcffff00000100030000005f31360028fdffff000001021c0000000c0000000400000000000000acfdffff0000000120000000030000005f31350054fdffff00000103180000000c000000040000000000000042fdffff00000100030000005f3134007cfdffff00000103180000000c00000004000000000000006afdffff00000100030000005f313300a4fdffff0000010c4400000010000000040000000100000008000000acffffffc4fdffff00000103180000000c0000000400000000000000b2fdffff00000100040000006974656d00000000030000005f313200f8fdffff0000010c480000001400000004000000010000000c00000004000400040000001cfeffff00000103180000000c00000004000000000000000afeffff00000100040000006974656d00000000030000005f31310050feffff000001021c0000000c0000000400000000000000d4feffff0000000120000000030000005f3130007cfeffff00000103180000000c00000004000000000000006afeffff00000100020000005f390000a4feffff00000103180000000c000000040000000000000092feffff00000100020000005f380000ccfeffff000001021c0000000c000000040000000000000050ffffff0000000120000000020000005f370000f8feffff000001021c0000000c00000004000000000000007cffffff0000000120000000020000005f36000024ffffff000001021c0000000c0000000400000000000000a8ffffff0000000120000000020000005f35000050ffffff000001021c0000000c0000000400000000000000d4ffffff0000000120000000020000005f3400007cffffff000001022400000014000000040000000000000008000c0008000700080000000000000120000000020000005f330000b0ffffff00000103180000000c00000004000000000000009effffff00000100020000005f320000d8ffffff00000103180000000c0000000400000000000000c6ffffff00000100020000005f310000100014000800060007000c000000100010000000000001032000000014000000040000000000000000000600080006000600000000000100020000005f300000ffffffffd804000014000000000000000c0016000600050008000c000c0000000003030018000000e00000000000000000000a0018000c00040008000a0000001c0300001000000001000000000000000000000030000000000000000000000000000000000000000000000000000000080000000000000008000000000000000000000000000000080000000000000008000000000000001000000000000000000000000000000010000000000000000800000000000000180000000000000000000000000000001800000000000000080000000000000020000000000000000000000000000000200000000000000008000000000000002800000000000000000000000000000028000000000000000800000000000000300000000000000000000000000000003000000000000000080000000000000038000000000000000000000000000000380000000000000008000000000000004000000000000000000000000000000040000000000000000800000000000000480000000000000000000000000000004800000000000000080000000000000050000000000000000000000000000000500000000000000008000000000000005800000000000000000000000000000058000000000000000800000000000000600000000000000000000000000000006000000000000000100000000000000070000000000000000000000000000000700000000000000008000000000000007800000000000000000000000000000078000000000000001000000000000000880000000000000000000000000000008800000000000000080000000000000090000000000000000000000000000000900000000000000008000000000000009800000000000000000000000000000098000000000000000800000000000000a0000000000000000000000000000000a0000000000000000800000000000000a8000000000000000000000000000000a8000000000000000800000000000000b0000000000000000000000000000000b0000000000000001000000000000000c0000000000000000000000000000000c0000000000000000800000000000000c8000000000000000000000000000000c8000000000000001000000000000000d8000000000000000000000000000000d800000000000000080000000000000000000000180000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000300000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000003000000000000000000000000000000010000000000000000000000000000000000003f000000000000203f000000000000003f00000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000abaa2a3f00000000320000000000000000000000030000000000803f0000003f000000000000000000000000030000000000003f0000003f0000803f0000000000000000000000000000003f00000000200000000000000000000000000000000000000003000000000000000000003f0000803f000000000000000003000000000000000000003f0000803f000000000000803f00000000ffffffff00000000fffffffd0000016eae25ec840000016eae25f77b0000016eae25f7ac00000000000000000000000000000000ffffffff00000000fffffffc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 {code} h2. Related Bugs I have a related bug that I've gotten where the schema in the input Arrow message was transmiitted incorrectly. In that case, the input schema should have been <long, float, long> but was transmitted as <long, long, float>. As a result, the float column was interpreted as a long (equivalent C code to illustrate behavior: ) {code:java} long reinterpret(double floating_point_number) { return *(long*)(&floating_point_number) } {code} I got around this bug by making all 3 columns float and converting them to long within the UDF via Pandas Series.apply(np.int) Along the way, I had trouble with [Python's dict keys being non-deterministic|[https://stackoverflow.com/questions/14956313/why-is-dictionary-ordering-non-deterministic].] This led columns being passed to GroupedData.agg() in different orders for each worker and driver process. I've mitigated this by explicitly ordering the columns before sending them to agg. I don't think this is an issue anymore, but I'm calling it out just in case. > Failure when returning a value from multiple Pandas UDFs > -------------------------------------------------------- > > Key: SPARK-30063 > URL: https://issues.apache.org/jira/browse/SPARK-30063 > Project: Spark > Issue Type: Bug > Components: PySpark > Affects Versions: 2.4.3, 2.4.4 > Environment: Happens on Mac & Ubuntu (Docker). Seems to happen on > both 2.4.3 and 2.4.4 > Reporter: Tim Kellogg > Priority: Major > Attachments: spark-debug.txt > > > I have 20 Pandas UDFs that I'm trying to evaluate all at the same time. > * PandasUDFType.GROUPED_AGG > * 3 columns in the input data frame being serialized over Arrow to Python > worker. See below for clarification. > * All functions take 2 parameters, some combination of the 3 received as > Arrow input. > * Varying return types, see details below. > _*I get an IllegalArgumentException on the Scala side of the worker when > deserializing from Python.*_ > h2. Exception & Stack Trace > {code:java} > 19/11/27 11:38:36 ERROR Executor: Exception in task 0.0 in stage 5.0 (TID 5) > java.lang.IllegalArgumentException > at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) > at > org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) > at > org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) > at > org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) > at > org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) > at > org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > 19/11/27 11:38:36 WARN TaskSetManager: Lost task 0.0 in stage 5.0 (TID 5, > localhost, executor driver): java.lang.IllegalArgumentException > at java.nio.ByteBuffer.allocate(ByteBuffer.java:334) > at > org.apache.arrow.vector.ipc.message.MessageSerializer.readMessage(MessageSerializer.java:543) > at > org.apache.arrow.vector.ipc.message.MessageChannelReader.readNext(MessageChannelReader.java:58) > at > org.apache.arrow.vector.ipc.ArrowStreamReader.readSchema(ArrowStreamReader.java:132) > at > org.apache.arrow.vector.ipc.ArrowReader.initialize(ArrowReader.java:181) > at > org.apache.arrow.vector.ipc.ArrowReader.ensureInitialized(ArrowReader.java:172) > at > org.apache.arrow.vector.ipc.ArrowReader.getVectorSchemaRoot(ArrowReader.java:65) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:162) > at > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$1.read(ArrowPythonRunner.scala:122) > at > org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:410) > at > org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) > at > org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at > org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) > at > org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) > at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) > at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) > at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) > at org.apache.spark.scheduler.Task.run(Task.scala:123) > at > org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) > at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) > at java.lang.Thread.run(Thread.java:748) > {code} > h2. Input Arrow Schema > I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out > schema & message. This is the input, in load_stream, the code is print(batch, > batch.schema, file=log_file) > {code:java} > <pyarrow.lib.RecordBatch object at 0x10640ecc8> > _0: double > _1: double > _2: double > metadata > -------- > OrderedDict() > {code} > h2. Output Arrow Schema > I edited ArrowStreamPandasSerializer in pyspark/serializers.py to print out > schema & message. This is the output, in dump_stream, the code is > print(batch, batch.schema, file=log_file) > {code:java} > <pyarrow.lib.RecordBatch object at 0x11ad5b638> _0: float > _1: float > _2: float > _3: int32 > _4: int32 > _5: int32 > _6: int32 > _7: int32 > _8: float > _9: float > _10: int32 > _11: list<item: float> > child 0, item: float > _12: list<item: float> > child 0, item: float > _13: float > _14: float > _15: int32 > _16: float > _17: list<item: float> > child 0, item: float > _18: list<item: float> > child 0, item: float > _19: float > {code} > h2. Arrow Message > I edited ArrowPythonReader.scala at line 163 to print out the Arrow message. > Debug code: > {code:java} > val fw = new java.io.FileWriter("spark-debug.txt", true) > try { > val buf = new Array[Byte](40000) > stream.read(buf) > fw.write(s"Spark reader\n") > for (b <- buf) { > fw.write(String.format("%02x", Byte.box(b))) > } > fw.write(s"\n") > } finally fw.close() > {code} > Debug output (some trailing 0's included for completeness). > {code:java} > ffffffff900400001000000000000a000c000600050008000a000000000103000c000000080008000000040008000000040000001400000030040000f4030000c803000090030000600300003003000000030000d0020000a40200007802000048020000ec01000094010000680100003c0100000c010000e000000088000000300000000400000030fcffff00000103180000000c00000004000000000000001efcffff00000100030000005f31390058fcffff0000010c440000001000000004000000010000000800000060feffff78fcffff00000103180000000c000000040000000000000066fcffff00000100040000006974656d00000000030000005f313800acfcffff0000010c4400000010000000040000000100000008000000b4feffffccfcffff00000103180000000c0000000400000000000000bafcffff00000100040000006974656d00000000030000005f31370000fdffff00000103180000000c0000000400000000000000eefcffff00000100030000005f31360028fdffff000001021c0000000c0000000400000000000000acfdffff0000000120000000030000005f31350054fdffff00000103180000000c000000040000000000000042fdffff00000100030000005f3134007cfdffff00000103180000000c00000004000000000000006afdffff00000100030000005f313300a4fdffff0000010c4400000010000000040000000100000008000000acffffffc4fdffff00000103180000000c0000000400000000000000b2fdffff00000100040000006974656d00000000030000005f313200f8fdffff0000010c480000001400000004000000010000000c00000004000400040000001cfeffff00000103180000000c00000004000000000000000afeffff00000100040000006974656d00000000030000005f31310050feffff000001021c0000000c0000000400000000000000d4feffff0000000120000000030000005f3130007cfeffff00000103180000000c00000004000000000000006afeffff00000100020000005f390000a4feffff00000103180000000c000000040000000000000092feffff00000100020000005f380000ccfeffff000001021c0000000c000000040000000000000050ffffff0000000120000000020000005f370000f8feffff000001021c0000000c00000004000000000000007cffffff0000000120000000020000005f36000024ffffff000001021c0000000c0000000400000000000000a8ffffff0000000120000000020000005f35000050ffffff000001021c0000000c0000000400000000000000d4ffffff0000000120000000020000005f3400007cffffff000001022400000014000000040000000000000008000c0008000700080000000000000120000000020000005f330000b0ffffff00000103180000000c00000004000000000000009effffff00000100020000005f320000d8ffffff00000103180000000c0000000400000000000000c6ffffff00000100020000005f310000100014000800060007000c000000100010000000000001032000000014000000040000000000000000000600080006000600000000000100020000005f300000ffffffffd804000014000000000000000c0016000600050008000c000c0000000003030018000000e00000000000000000000a0018000c00040008000a0000001c0300001000000001000000000000000000000030000000000000000000000000000000000000000000000000000000080000000000000008000000000000000000000000000000080000000000000008000000000000001000000000000000000000000000000010000000000000000800000000000000180000000000000000000000000000001800000000000000080000000000000020000000000000000000000000000000200000000000000008000000000000002800000000000000000000000000000028000000000000000800000000000000300000000000000000000000000000003000000000000000080000000000000038000000000000000000000000000000380000000000000008000000000000004000000000000000000000000000000040000000000000000800000000000000480000000000000000000000000000004800000000000000080000000000000050000000000000000000000000000000500000000000000008000000000000005800000000000000000000000000000058000000000000000800000000000000600000000000000000000000000000006000000000000000100000000000000070000000000000000000000000000000700000000000000008000000000000007800000000000000000000000000000078000000000000001000000000000000880000000000000000000000000000008800000000000000080000000000000090000000000000000000000000000000900000000000000008000000000000009800000000000000000000000000000098000000000000000800000000000000a0000000000000000000000000000000a0000000000000000800000000000000a8000000000000000000000000000000a8000000000000000800000000000000b0000000000000000000000000000000b0000000000000001000000000000000c0000000000000000000000000000000c0000000000000000800000000000000c8000000000000000000000000000000c8000000000000001000000000000000d8000000000000000000000000000000d800000000000000080000000000000000000000180000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000300000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000010000000000000000000000000000000100000000000000000000000000000001000000000000000000000000000000030000000000000000000000000000000100000000000000000000000000000003000000000000000000000000000000010000000000000000000000000000000000003f000000000000203f000000000000003f00000000000000000000000000000000000000000000000000000000000000000000000010000000000000000000000000000000abaa2a3f00000000320000000000000000000000030000000000803f0000003f000000000000000000000000030000000000003f0000003f0000803f0000000000000000000000000000003f00000000200000000000000000000000000000000000000003000000000000000000003f0000803f000000000000000003000000000000000000003f0000803f000000000000803f00000000ffffffff00000000fffffffd0000016eae25ec840000016eae25f77b0000016eae25f7ac00000000000000000000000000000000ffffffff00000000fffffffc0000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000 > {code} > > h2. Related Bugs > I have a related bug that I've gotten where the schema in the input Arrow > message was transmiitted incorrectly. In that case, the input schema should > have been <long, float, long> but was transmitted as <long, long, float>. As > a result, the float column was interpreted as a long (equivalent C code to > illustrate behavior: ) > {code:java} > long reinterpret(double floating_point_number) { > return *(long*)(&floating_point_number) > } > {code} > I got around this bug by making all 3 columns float and converting them to > long within the UDF via Pandas Series.apply(np.int). Strangely, a > Column.astype('float') didn't seem to have an effect, I had to make them > float at the source. > Along the way, I had trouble with [Python's dict keys being > non-deterministic|[https://stackoverflow.com/questions/14956313/why-is-dictionary-ordering-non-deterministic].] > This led columns being passed to GroupedData.agg() in different orders for > each worker and driver process. I've mitigated this by explicitly ordering > the columns before sending them to agg. I don't think this is an issue > anymore, but I'm calling it out just in case. > > -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org