[ 
https://issues.apache.org/jira/browse/SPARK-26116?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pierre Lienhart updated SPARK-26116:
------------------------------------
    Description: 
When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
sorts each partition before writing but this sort consumes a huge amount of 
memory compared to the size of the data. The executors can then go OOM and get 
killed by YARN. As a consequence, it also forces to provision huge amount of 
memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, xxxxxxxxx.xxxxxx.xxxxx.xx, executor 1): ExecutorLostFailure (executor 
1 exited caused by one of the running tasks) Reason: Container killed by YARN 
for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxxxxxx.xxxxx.xxxxx.xx, 
executor 1): org.apache.spark.SparkException: Task failed while writing rows

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

         at org.apache.spark.scheduler.Task.run(Task.scala:99)

         at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

         at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
/app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
 (No such file or directory)

         at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

         at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

         at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

         at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

         at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

         at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

         at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

         ... 8 more{code}
 
In the stderr logs, we can see that huge amount of sort data (the partition 
being sorted here is 250 MB when persisted into memory, deserialized) is being 
spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling sort data 
of 3.6 GB to disk}}). Sometimes the data is spilled in time to the disk and the 
sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.}}) but sometimes it does not and we see multiple 
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} 
until the application finally runs OOM with logs such as {{ERROR 
UnsafeExternalSorter: Unable to grow the pointer array}}.

I should mention that when looking at individual (successful) write tasks in 
the Spark UI, the Peak Execution Memory metric is always 0.  

It looks like a known issue : SPARK-12546 is explicitly related and led to a PR 
that decreased {{spark.sql.sources.maxConcurrentWrites}} default value from 5 
to 1. [Spark 1.6.0 release 
notes|https://spark.apache.org/releases/spark-release-1-6-0.html] also mentions 
this problem as a “Know Issue” and as described in SPARK-12546, advise to tweak 
both {{spark.memory.fraction}} and {{spark.hadoop.parquet.memory.pool.ratio}} 
without any explanation regarding how this should help (and the recommended 
values help indeed).

Could we at least enhance the documentation on this issue? I would be really 
helpful for me to understand what is happening in terms of memory so that I can 
better size my application and/or choose the most appropriate memory 
parameters. Still, how does it come that the sort generates that much data ?

I am running Spark 2.1.1 and do not know whether I would encounter this issue 
in later versions.

Many thanks,

Pierre LIENHART

  was:
When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
sorts each partition before writing but this sort consumes a huge amount of 
memory compared to the size of the data. The executors can then go OOM and get 
killed by YARN. As a consequence, it also force to provision huge amount of 
memory compared to the data to be written.

Error messages found in the Spark UI are like the following :

{code:java}
Spark UI description of failure : Job aborted due to stage failure: Task 169 in 
stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 2.0 
(TID 98, xxxxxxxxx.xxxxxx.xxxxx.xx, executor 1): ExecutorLostFailure (executor 
1 exited caused by one of the running tasks) Reason: Container killed by YARN 
for exceeding memory limits. 8.1 GB of 8 GB physical memory used. Consider 
boosting spark.yarn.executor.memoryOverhead.
{code}
 
{code:java}
Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxxxxxx.xxxxx.xxxxx.xx, 
executor 1): org.apache.spark.SparkException: Task failed while writing rows

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)

         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

         at org.apache.spark.scheduler.Task.run(Task.scala:99)

         at 
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

         at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

         at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

         at java.lang.Thread.run(Thread.java:745)

Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
/app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
 (No such file or directory)

         at 
org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)

         at 
org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)

         at 
org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)

         at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)

         at 
org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)

         at 
org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)

         at 
org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)

         at 
org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)

         ... 8 more{code}
 
In the stderr logs, we can see that huge amount of sort data (the partition 
being sorted here is 250 MB when persisted into memory, deserialized) is being 
spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling sort data 
of 3.6 GB to disk}}). Sometimes the data is spilled in time to the disk and the 
sort completes ({{INFO FileFormatWriter: Sorting complete. Writing out 
partition files one at a time.}}) but sometimes it does not and we see multiple 
{{TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.}} 
until the application finally runs OOM with logs such as {{ERROR 
UnsafeExternalSorter: Unable to grow the pointer array}}.

I should mention that when looking at individual (successful) write tasks in 
the Spark UI, the Peak Execution Memory metric is always 0.  

It looks like a known issue : SPARK-12546 is explicitly related and led to a PR 
that decreased {{spark.sql.sources.maxConcurrentWrites}} default value from 5 
to 1. [Spark 1.6.0 release 
notes|https://spark.apache.org/releases/spark-release-1-6-0.html] also mentions 
this problem as a “Know Issue” and as described in SPARK-12546, advise to tweak 
both {{spark.memory.fraction}} and {{spark.hadoop.parquet.memory.pool.ratio}} 
without any explanation regarding how this should help (and the recommended 
values help indeed).

Could we at least enhance the documentation on this issue? I would be really 
helpful for me to understand what is happening in terms of memory so that I can 
better size my application and/or choose the most appropriate memory 
parameters. Still, how does it come that the sort generates that much data ?

I am running Spark 2.1.1 and do not know whether I would encounter this issue 
in later versions.

Many thanks,

Pierre LIENHART


> Spark SQL - Sort when writing partitioned parquet leads to OOM errors
> ---------------------------------------------------------------------
>
>                 Key: SPARK-26116
>                 URL: https://issues.apache.org/jira/browse/SPARK-26116
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 2.1.1
>            Reporter: Pierre Lienhart
>            Priority: Major
>
> When writing partitioned parquet using {{partitionBy}}, it looks like Spark 
> sorts each partition before writing but this sort consumes a huge amount of 
> memory compared to the size of the data. The executors can then go OOM and 
> get killed by YARN. As a consequence, it also forces to provision huge amount 
> of memory compared to the data to be written.
> Error messages found in the Spark UI are like the following :
> {code:java}
> Spark UI description of failure : Job aborted due to stage failure: Task 169 
> in stage 2.0 failed 1 times, most recent failure: Lost task 169.0 in stage 
> 2.0 (TID 98, xxxxxxxxx.xxxxxx.xxxxx.xx, executor 1): ExecutorLostFailure 
> (executor 1 exited caused by one of the running tasks) Reason: Container 
> killed by YARN for exceeding memory limits. 8.1 GB of 8 GB physical memory 
> used. Consider boosting spark.yarn.executor.memoryOverhead.
> {code}
>  
> {code:java}
> Job aborted due to stage failure: Task 66 in stage 4.0 failed 1 times, most 
> recent failure: Lost task 66.0 in stage 4.0 (TID 56, xxxxxxx.xxxxx.xxxxx.xx, 
> executor 1): org.apache.spark.SparkException: Task failed while writing rows
>          at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:204)
>          at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:129)
>          at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1$$anonfun$3.apply(FileFormatWriter.scala:128)
>          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
>          at org.apache.spark.scheduler.Task.run(Task.scala:99)
>          at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
>          at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>          at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>          at java.lang.Thread.run(Thread.java:745)
> Caused by: java.lang.OutOfMemoryError: error while calling spill() on 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter@75194804 : 
> /app/hadoop/yarn/local/usercache/at053351/appcache/application_1537536072724_17039/blockmgr-a4ba7d59-e780-4385-99b4-a4c4fe95a1ec/25/temp_local_a542a412-5845-45d2-9302-bbf5ee4113ad
>  (No such file or directory)
>          at 
> org.apache.spark.memory.TaskMemoryManager.acquireExecutionMemory(TaskMemoryManager.java:188)
>          at 
> org.apache.spark.memory.TaskMemoryManager.allocatePage(TaskMemoryManager.java:254)
>          at 
> org.apache.spark.memory.MemoryConsumer.allocateArray(MemoryConsumer.java:92)
>          at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.growPointerArrayIfNecessary(UnsafeExternalSorter.java:347)
>          at 
> org.apache.spark.util.collection.unsafe.sort.UnsafeExternalSorter.insertKVRecord(UnsafeExternalSorter.java:425)
>          at 
> org.apache.spark.sql.execution.UnsafeKVExternalSorter.insertKV(UnsafeKVExternalSorter.java:160)
>          at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$DynamicPartitionWriteTask.execute(FileFormatWriter.scala:364)
>          at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:190)
>          at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:188)
>          at 
> org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1353)
>          at 
> org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:193)
>          ... 8 more{code}
>  
> In the stderr logs, we can see that huge amount of sort data (the partition 
> being sorted here is 250 MB when persisted into memory, deserialized) is 
> being spilled to the disk ({{INFO UnsafeExternalSorter: Thread 155 spilling 
> sort data of 3.6 GB to disk}}). Sometimes the data is spilled in time to the 
> disk and the sort completes ({{INFO FileFormatWriter: Sorting complete. 
> Writing out partition files one at a time.}}) but sometimes it does not and 
> we see multiple {{TaskMemoryManager: Failed to allocate a page (67108864 
> bytes), try again.}} until the application finally runs OOM with logs such as 
> {{ERROR UnsafeExternalSorter: Unable to grow the pointer array}}.
> I should mention that when looking at individual (successful) write tasks in 
> the Spark UI, the Peak Execution Memory metric is always 0.  
> It looks like a known issue : SPARK-12546 is explicitly related and led to a 
> PR that decreased {{spark.sql.sources.maxConcurrentWrites}} default value 
> from 5 to 1. [Spark 1.6.0 release 
> notes|https://spark.apache.org/releases/spark-release-1-6-0.html] also 
> mentions this problem as a “Know Issue” and as described in SPARK-12546, 
> advise to tweak both {{spark.memory.fraction}} and 
> {{spark.hadoop.parquet.memory.pool.ratio}} without any explanation regarding 
> how this should help (and the recommended values help indeed).
> Could we at least enhance the documentation on this issue? I would be really 
> helpful for me to understand what is happening in terms of memory so that I 
> can better size my application and/or choose the most appropriate memory 
> parameters. Still, how does it come that the sort generates that much data ?
> I am running Spark 2.1.1 and do not know whether I would encounter this issue 
> in later versions.
> Many thanks,
> Pierre LIENHART



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to