[
https://issues.apache.org/jira/browse/SPARK-48380?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Zheng Shao updated SPARK-48380:
-------------------------------
Description:
The stacktrace:
{{```}}
{{Py4JJavaError: An error occurred while calling o562.saveAsTable.}}
{{: org.apache.spark.SparkException: Job aborted due to stage failure: Task
1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in
stage 8.0 (TID 2782) (10.251.129.187 executor 70):
java.lang.IllegalArgumentException: Cannot grow BufferHolder by size 578595584
because the size after growing exceeds size limitation 2147483632}}
{{ at
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}}
{{ at
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}}
{{ at
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}}
{{ at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}}
{{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
{{ at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}}
{{ at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}}
{{ at
org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}}
{{ at
org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}}
{{ at
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}}
{{ at
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}}
{{ at
java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}}
{{ at
java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}}
{{ at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}}
{{ at
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}}
{{ at
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}}
{{ at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}}
{{ at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}}
{{ at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}}
{{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
{{ at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}}
{{ at
org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}}
{{ at
org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}}
{{ at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}}
{{ at
org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}}
{{ at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}}
{{ at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}}
{{ at
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}}
{{ at
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}}
{{ at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}}
{{ at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}}
{{ at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}}
{{ at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}}
{{```}}
It's caused by the following code: Let's say 10K almost-empty rows, and then
10K rows with 1MB each.
{{{}```{}}}{{{}private[spark] class AutoBatchedPickler(iter: Iterator[Any])
extends Iterator[Array[Byte]] {{}}}
{{ private val pickle = new Pickler(/* useMemo = */ true,}}
{{ /* valueCompare = */ false)}}
{{ private var batch = 1}}
{{ private val buffer = new mutable.ArrayBuffer[Any]}}{{ override def
hasNext: Boolean = iter.hasNext}}{{ override def next(): Array[Byte] = {}}
{{{} while (iter.hasNext && buffer.length < batch){}}}{{{}{ buffer
+= iter.next() }{}}}{{ val bytes = pickle.dumps(buffer.toArray)}}
{{ val size = bytes.length}}
{{ // let 1M < size < 10M}}
{{{} if (size < 1024 * 1024){}}}{{{}{ batch *= 2
}{}}}{{{}else if (size > 1024 * 1024 * 10 && batch > 1){}}}{{{}{ batch
/= 2 }{}}}{{ buffer.clear()}}
{{ bytes}}
{{ }}}
{{{} }{}}}{{{}```{}}}
was:
The stacktrace:
```
Py4JJavaError: An error occurred while calling o562.saveAsTable.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 1811
in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in stage 8.0
(TID 2782) (10.251.129.187 executor 70): java.lang.IllegalArgumentException:
Cannot grow BufferHolder by size 578595584 because the size after growing
exceeds size limitation 2147483632
at
org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)
at
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)
at
org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)
at
org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)
at
org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)
at
org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)
at
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)
at
org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)
at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)
at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)
at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)
at
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)
at
org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)
at
org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at
org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)
at
org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)
at
org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)
at
org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)
at
org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)
at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)
at
org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)
at
org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)
at
com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)
at
org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)
```
It's caused by the following code: Let's say 10K almost-empty rows, and then
10K rows with 1MB each.
```
private[spark] class AutoBatchedPickler(iter: Iterator[Any]) extends
Iterator[Array[Byte]] {
private val pickle = new Pickler(/* useMemo = */ true,
/* valueCompare = */ false)
private var batch = 1
private val buffer = new mutable.ArrayBuffer[Any]
override def hasNext: Boolean = iter.hasNext
override def next(): Array[Byte] = {
while (iter.hasNext && buffer.length < batch) {
buffer += iter.next()
}
val bytes = pickle.dumps(buffer.toArray)
val size = bytes.length
// let 1M < size < 10M
if (size < 1024 * 1024) {
batch *= 2
} else if (size > 1024 * 1024 * 10 && batch > 1) {
batch /= 2
}
buffer.clear()
bytes
}
}
```
> AutoBatchedPickler caused Unsafe allocate to fail due to 2GB limit
> ------------------------------------------------------------------
>
> Key: SPARK-48380
> URL: https://issues.apache.org/jira/browse/SPARK-48380
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 3.5.1
> Reporter: Zheng Shao
> Priority: Major
>
> The stacktrace:
> {{```}}
> {{Py4JJavaError: An error occurred while calling o562.saveAsTable.}}
> {{: org.apache.spark.SparkException: Job aborted due to stage failure: Task
> 1811 in stage 8.0 failed 4 times, most recent failure: Lost task 1811.3 in
> stage 8.0 (TID 2782) (10.251.129.187 executor 70):
> java.lang.IllegalArgumentException: Cannot grow BufferHolder by size
> 578595584 because the size after growing exceeds size limitation 2147483632}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder.grow(BufferHolder.java:71)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.grow(UnsafeWriter.java:66)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.codegen.UnsafeWriter.write(UnsafeWriter.java:201)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.writeArray(InterpretedUnsafeProjection.scala:322)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16(InterpretedUnsafeProjection.scala:200)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$16$adapted(InterpretedUnsafeProjection.scala:198)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22(InterpretedUnsafeProjection.scala:288)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateFieldWriter$22$adapted(InterpretedUnsafeProjection.scala:286)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2(InterpretedUnsafeProjection.scala:123)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.$anonfun$generateStructWriter$2$adapted(InterpretedUnsafeProjection.scala:120)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3(InterpretedUnsafeProjection.scala:67)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.$anonfun$writer$3$adapted(InterpretedUnsafeProjection.scala:65)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:90)}}
> {{ at
> org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.apply(InterpretedUnsafeProjection.scala:36)}}
> {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
> {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
> {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
> {{ at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)}}
> {{ at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:93)}}
> {{ at
> org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.next(SerDeUtil.scala:83)}}
> {{ at
> org.apache.spark.api.python.PythonRDD$.writeNextElementToStream(PythonRDD.scala:474)}}
> {{ at
> org.apache.spark.api.python.PythonRunner$$anon$2.writeNextInputToStream(PythonRunner.scala:885)}}
> {{ at
> org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.writeAdditionalInputToPythonWorker(PythonRunner.scala:813)}}
> {{ at
> org.apache.spark.api.python.BasePythonRunner$ReaderInputStream.read(PythonRunner.scala:733)}}
> {{ at
> java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:244)}}
> {{ at
> java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:263)}}
> {{ at java.base/java.io.DataInputStream.readInt(DataInputStream.java:381)}}
> {{ at
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:912)}}
> {{ at
> org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:904)}}
> {{ at
> org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:506)}}
> {{ at
> org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)}}
> {{ at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:491)}}
> {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
> {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
> {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
> {{ at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)}}
> {{ at
> org.apache.spark.sql.execution.UnsafeExternalRowSorter.sort(UnsafeExternalRowSorter.java:240)}}
> {{ at
> org.apache.spark.sql.execution.SortExec$$anon$2.sortedIterator(SortExec.scala:134)}}
> {{ at
> org.apache.spark.sql.execution.SortExec$$anon$2.hasNext(SortExec.scala:148)}}
> {{ at
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:539)}}
> {{ at
> org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:117)}}
> {{ at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:933)}}
> {{ at
> org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:933)}}
> {{ at
> org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:60)}}
> {{ at
> org.apache.spark.rdd.RDD.$anonfun$computeOrReadCheckpoint$1(RDD.scala:409)}}
> {{ at
> com.databricks.spark.util.ExecutorFrameProfiler$.record(ExecutorFrameProfiler.scala:110)}}
> {{ at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:406)}}
> {{ at org.apache.spark.rdd.RDD.iterator(RDD.scala:373)}}
> {{ at
> org.apache.spark.scheduler.ResultTask.$anonfun$runTask$3(ResultTask.scala:82)}}
> {{```}}
> It's caused by the following code: Let's say 10K almost-empty rows, and then
> 10K rows with 1MB each.
> {{{}```{}}}{{{}private[spark] class AutoBatchedPickler(iter: Iterator[Any])
> extends Iterator[Array[Byte]] {{}}}
> {{ private val pickle = new Pickler(/* useMemo = */ true,}}
> {{ /* valueCompare = */ false)}}
> {{ private var batch = 1}}
> {{ private val buffer = new mutable.ArrayBuffer[Any]}}{{ override def
> hasNext: Boolean = iter.hasNext}}{{ override def next(): Array[Byte] = {}}
> {{{} while (iter.hasNext && buffer.length < batch){}}}{{{}{
> buffer += iter.next() }{}}}{{ val bytes =
> pickle.dumps(buffer.toArray)}}
> {{ val size = bytes.length}}
> {{ // let 1M < size < 10M}}
> {{{} if (size < 1024 * 1024){}}}{{{}{ batch *= 2
> }{}}}{{{}else if (size > 1024 * 1024 * 10 && batch > 1){}}}{{{}{
> batch /= 2 }{}}}{{ buffer.clear()}}
> {{ bytes}}
> {{ }}}
> {{{} }{}}}{{{}```{}}}
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]