That was spot on! I had 3 columns with 80characters => 80*21*10^6 = 1.56 bytes I removed these columns and replaced each with 10 doubleType columns (so it would still be 80 bytes of data) - and this error didn't come up anymore. I also removed all the other columns and just kept 1 column with 80characters - I got the error again.
I'll make a simpler example and report it to spark - as I guess these columns would need some special handling. Now, when I run - I get a different error: 19/03/01 20:16:49 WARN TaskSetManager: Lost task 108.0 in stage 8.0 (TID 12, ip-172-31-10-249.us-west-2.compute.internal, executor 1): org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py", line 230, in main process() File "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/worker.py", line 225, in process serializer.dump_stream(func(split_index, iterator), outfile) File "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py", line 260, in dump_stream for series in iterator: File "/mnt/yarn/usercache/hadoop/appcache/application_1551469777576_0010/container_1551469777576_0010_01_000002/pyspark.zip/pyspark/serializers.py", line 279, in load_stream for batch in reader: File "pyarrow/ipc.pxi", line 265, in __iter__ File "pyarrow/ipc.pxi", line 281, in pyarrow.lib._RecordBatchReader.read_next_batch File "pyarrow/error.pxi", line 83, in pyarrow.lib.check_status pyarrow.lib.ArrowIOError: read length must be positive or -1 Again, any pointers on what this means and what it indicates would be really useful for me. Thanks for the replies! On Fri, Mar 1, 2019 at 11:26 PM Uwe L. Korn <uw...@xhochy.com> wrote: > There is currently the limitation that a column in a single RecordBatch > can only hold 2G on the Java side. We work around this by splitting the > DataFrame under the hood into multiple RecordBatches. I'm not familiar with > the Spark<->Arrow code but I guess that in this case, the Spark code can > only handle a single RecordBatch. > > Probably it is best to construct a https://stackoverflow.com/help/mcve > and create an issue with the Spark project. Most likely this is not a bug > in Arrow but just requires a bit more complicated implementation around the > Arrow libs. > > Still, please have a look at the exact size of your columns. We support 2G > per column, if it is only 1.5G, then there is probably a rounding error in > the Arrow. Alternatively, you might also be in luck that the following > patch > https://github.com/apache/arrow/commit/bfe6865ba8087a46bd7665679e48af3a77987cef > which is part of Apache Arrow 0.12 already fixes your problem. > > Uwe > > On Fri, Mar 1, 2019, at 6:48 PM, Abdeali Kothari wrote: > > Is there a limitation that a single column cannot be more than 1-2G ? > > One of my columns definitely would be around 1.5GB of memory. > > > > I cannot split my DF into more partitions as I have only 1 ID and I'm > > grouping by that ID. > > So, the UDAF would only run on a single pandasDF > > I do have a requirement to make a very large DF for this UDAF (8GB as i > > mentioned above) - trying to figure out what I need to do here to make > this > > work. > > Increasing RAM, etc. is no issue (i understand I'd need huge executors > as I > > have a huge data requirement). But trying to figure out how much to > > actually get - cause 20GB of RAM for the executor is also erroring out > > where I thought ~10GB would have been enough > > > > > > > > On Fri, Mar 1, 2019 at 10:25 PM Uwe L. Korn <uw...@xhochy.com> wrote: > > > > > Hello Abdeali, > > > > > > a problem could here be that a single column of your dataframe is using > > > more than 2GB of RAM (possibly also just 1G). Try splitting your > DataFrame > > > into more partitions before applying the UDAF. > > > > > > Cheers > > > Uwe > > > > > > On Fri, Mar 1, 2019, at 9:09 AM, Abdeali Kothari wrote: > > > > I was using arrow with spark+python and when I'm trying some > pandas-UDAF > > > > functions I am getting this error: > > > > > > > > org.apache.arrow.vector.util.OversizedAllocationException: Unable to > > > > expand > > > > the buffer > > > > at > > > > > > > > org.apache.arrow.vector.BaseVariableWidthVector.reallocDataBuffer(BaseVariableWidthVector.java:457) > > > > at > > > > > > > > org.apache.arrow.vector.BaseVariableWidthVector.handleSafe(BaseVariableWidthVector.java:1188) > > > > at > > > > > > > > org.apache.arrow.vector.BaseVariableWidthVector.setSafe(BaseVariableWidthVector.java:1026) > > > > at > > > > > > > > org.apache.spark.sql.execution.arrow.StringWriter.setValue(ArrowWriter.scala:256) > > > > at > > > > > > > > org.apache.spark.sql.execution.arrow.ArrowFieldWriter.write(ArrowWriter.scala:122) > > > > at > > > > > > > > org.apache.spark.sql.execution.arrow.ArrowWriter.write(ArrowWriter.scala:87) > > > > at > > > > > > > > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply$mcV$sp(ArrowPythonRunner.scala:84) > > > > at > > > > > > > > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) > > > > at > > > > > > > > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2$$anonfun$writeIteratorToStream$1.apply(ArrowPythonRunner.scala:75) > > > > at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1380) > > > > at > > > > > > > > org.apache.spark.sql.execution.python.ArrowPythonRunner$$anon$2.writeIteratorToStream(ArrowPythonRunner.scala:95) > > > > at > > > > > > > > org.apache.spark.api.python.BasePythonRunner$WriterThread$$anonfun$run$1.apply(PythonRunner.scala:215) > > > > at > org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1991) > > > > at > > > > > > > > org.apache.spark.api.python.BasePythonRunner$WriterThread.run(PythonRunner.scala:170) > > > > > > > > I was initially getting a RAM is insufficient error - and > theoretically > > > > (with no compression) realized that the pandas DataFrame it would > try to > > > > create would be ~8GB (21million records with each record having ~400 > > > > bytes). I have increased my executor memory to be 20GB per executor, > but > > > am > > > > now getting this error from Arrow. > > > > Looking for some pointers so I can understand this issue better. > > > > > > > > Here's what I am trying. I have 2 tables with string columns where > the > > > > strings always have a fixed length: > > > > *Table 1*: > > > > id: integer > > > > char_column1: string (length = 30) > > > > char_column2: string (length = 40) > > > > char_column3: string (length = 10) > > > > ... > > > > In total, in table1, the char-columns have ~250 characters > > > > > > > > *Table 2*: > > > > id: integer > > > > char_column1: string (length = 50) > > > > char_column2: string (length = 3) > > > > char_column3: string (length = 4) > > > > ... > > > > In total, in table2, the char-columns have ~150 characters > > > > > > > > I am joining these tables by ID. In my current dataset, I have > filtered > > > my > > > > data so only id=1 exists. > > > > Table1 has ~400 records for id=1 and table2 has 50k records for id=1. > > > > Hence, total number of records (after joining) for table1_join2 = > 400 * > > > 50k > > > > = 20*10^6 records > > > > Each row has ~400bytes (150+250) => overall memory = 8*10^9 bytes => > ~8GB > > > > > > > > Now, when I try an executor with 20GB RAM, it does not work. > > > > Is there some data duplicity happening internally ? What should be > the > > > > estimated RAM I need to give for this to work ? > > > > > > > > Thanks for reading, > > > > > > > > > >