Zheng Shao created SPARK-48380:
----------------------------------

             Summary: AutoBatchedPickler caused Unsafe allocate to fail due to 
2GB limit
                 Key: SPARK-48380
                 URL: https://issues.apache.org/jira/browse/SPARK-48380
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 3.5.1
            Reporter: Zheng Shao


The stacktrace:
```
Py4JJavaError: An error occurred while calling o562.saveAsTable.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1811 
in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in stage 8.0 
(TID 2782) (10.251.129.187 executor 70): java.lang.IllegalArgumentException: 
Cannot grow BufferHolder by size 578595584 because the size after growing 
exceeds size limitation 2147483632
    at 
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)
    at 
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)
    at 
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
    at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)
    at 
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)
    at 
org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)
    at 
org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)
    at 
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)
    at 
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)
    at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
    at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
    at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)
    at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)
    at 
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)
    at 
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)
    at 
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
    at 
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)
    at 
org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)
    at 
org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)
    at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)
    at 
org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)
    at 
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
    at 
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
    at 
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
    at 
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
```



It's caused by the following code: Let's say 10K almost-empty rows, and then 
10K rows with 1MB each.

```

private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends 
Iterator[Array[Byte]] {
    private val pickle = new Pickler(/* useMemo = */ true,
      /* valueCompare = */ false)
    private var batch = 1
    private val buffer = new mutable.ArrayBuffer[Any]

    override def hasNext: Boolean = iter.hasNext

    override def next(): Array[Byte] = {
      while (iter.hasNext && buffer.length < batch) {
        buffer += iter.next()
      }
      val bytes = pickle.dumps(buffer.toArray)
      val size = bytes.length
      // let  1M < size < 10M
      if (size < 1024 * 1024) {
        batch *= 2
      } else if (size > 1024 * 1024 * 10 && batch > 1) {
        batch /= 2
      }
      buffer.clear()
      bytes
    }
  }

```



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to