[
https://issues.apache.org/jira/browse/PARQUET-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14575783#comment-14575783
]
Cheng Lian edited comment on PARQUET-222 at 6/10/15 3:54 PM:
-------------------------------------------------------------
There are several ways to alleviate this.
Firstly, for those DataFrames whose data sizes are small (e.g., the single row
case [~phatak.dev] mentioned), you may try {{df.coalesce(1)}} to reduce
partition number to 1. In this way, only a single file will be written. In
most cases, the default parallelism equals to the number of cores. For
example, if you are running a Spark application with a single executor on a
single 8-core node, that executor process needs to write 8 Parquet files even
there's only a single row.
Secondly, when you are writing DataFrames with large volume, you may try to
adjust DataFrame partition number (via {{df.repartition(m)}} and/or
{{df.coalesce(m)}}) and executor number (via the {{--num-executors}} flag of
{{spark-submit}}) to ensure the former is less than or equal to the latter, so
that each executor process only opens and writes one Parquet file on average.
And of course, heap size of a single executor should be large enough to allow
Parquet to write at least a single file.
was (Author: lian cheng):
There are several ways to alleviate this.
Firstly, for those DataFrames whose data sizes are small (e.g., the single row
case [~phatak.dev] mentioned), you may try {{df.coalesce(1)}} to reduce
partition number to 1. In this way, only a single file will be written. In
most cases, the default parallelism equals to the number of cores. For
example, if you are running a Spark application with a single executor on a
single 8-core node, that executor process needs to write 8 Parquet files even
there's only a single row.
Secondly, when you are writing DataFrames with large volume, you may try to
adjust DataFrame partition number (via {{df.repartition(m)}} and/or
{{df.coalesce(m)}}) and executor number (via the {{--num-executors}} flag of
{{spark-submit}}) to ensure the former is less than or equal to the latter, so
that each executor process only opens and writes at most one Parquet file.
And of course, heap size of a single executor should be large enough to allow
Parquet to write at least a single file.
> parquet writer runs into OOM during writing when calling
> DataFrame.saveAsParquetFile in Spark SQL
> -------------------------------------------------------------------------------------------------
>
> Key: PARQUET-222
> URL: https://issues.apache.org/jira/browse/PARQUET-222
> Project: Parquet
> Issue Type: Bug
> Components: parquet-mr
> Affects Versions: 1.6.0
> Reporter: Chaozhong Yang
> Original Estimate: 336h
> Remaining Estimate: 336h
>
> In Spark SQL, there is a function {{saveAsParquetFile}} in {{DataFrame}} or
> {{SchemaRDD}}. That function calls method in parquet-mr, and sometimes it
> will fail due to the OOM error thrown by parquet-mr. We can see the exception
> stack trace as follows:
> {noformat}
> WARN] [task-result-getter-3] 03-19 11:17:58,274 [TaskSetManager] - Lost task
> 0.2 in stage 137.0 (TID 309, hb1.avoscloud.com): java.lang.OutOfMemoryError:
> Java heap space
> at parquet.column.values.dictionary.IntList.initSlab(IntList.java:87)
> at parquet.column.values.dictionary.IntList.<init>(IntList.java:83)
> at
> parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:85)
> at
> parquet.column.values.dictionary.DictionaryValuesWriter$PlainIntegerDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:549)
> at
> parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:88)
> at
> parquet.column.impl.ColumnWriterImpl.<init>(ColumnWriterImpl.java:74)
> at
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68)
> at
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
> at
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
> at
> parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
> at
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
> at
> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
> at
> parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
> at
> org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:304)
> at
> org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at
> org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> By the way, there is another similar issue
> https://issues.apache.org/jira/browse/PARQUET-99. But the reporter has closed
> it and mark it as resolved.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)