[ 
https://issues.apache.org/jira/browse/PARQUET-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14580765#comment-14580765
 ] 

Cheng Lian edited comment on PARQUET-222 at 6/10/15 4:37 PM:
-------------------------------------------------------------

Hey [~rdblue], it seems that you are referring to use cases like writing to 
Hive dynamic partitions (where a single task may need to write to write 
multiple Parquet files according to partition column values)?  I believe the 
use case described in the JIRA description is different.  Unlike Hive or Pig, 
which use process level parallelism, Spark uses thread level parallelism (tasks 
are executed in thread pool).  And currently, there's no way to "pin" a Spark 
task to a specific process.  So even each task is guaranteed to write at most 
one Parquet file, it's still possible for a single executor process to write 
multiple Parquet files at some point.  So in the scope of Spark, currently 
there isn't a very good mechanism to fix this problem.  What I suggested was 
essentially to shrink partition number so that on average an executor writes 
only a single file (I made a mistake in my previous comment and said "at most 
one Parquet file", it should be "on average").

In case of dynamic partitioning, we do plan to re-partition the data according 
to partition column values before writing the data to reduce number of parallel 
writers.  Another possible approach was to sort the data within each task 
before writing, so that only a single writer is active for a task at any point 
of time.


was (Author: lian cheng):
Hey [~rdblue], it seems that you are referring to use cases like writing to 
Hive dynamic partitions (where a single task may need to write to write 
multiple Parquet files according to Partition column values)?  I believe the 
use case described in the JIRA description is different.  Unlike Hive or Pig, 
which use process level parallelism, Spark uses thread level parallelism (tasks 
are executed in thread pool).  And currently, there's no way to "pin" a Spark 
task to a specific process.  So even each task is guaranteed to write at most 
one Parquet file, it's still possible for a single executor process to write 
multiple Parquet files at some point.  So in the scope of Spark, currently 
there isn't a very good mechanism to fix this problem.  What I suggested was 
essentially to shrink partition number so that on average an executor writes 
only a single file (I made a mistake in my previous comment and said "at most 
one Parquet file", it should be "on average").

In case of dynamic partitioning, we do plan to re-partition the data according 
to partition column values before writing the data to reduce number of parallel 
writers.  Another possible approach was to sort the data within each task 
before writing, so that only a single writer is active for a task at any point 
of time.

> parquet writer runs into OOM during writing when calling 
> DataFrame.saveAsParquetFile in Spark SQL
> -------------------------------------------------------------------------------------------------
>
>                 Key: PARQUET-222
>                 URL: https://issues.apache.org/jira/browse/PARQUET-222
>             Project: Parquet
>          Issue Type: Bug
>          Components: parquet-mr
>    Affects Versions: 1.6.0
>            Reporter: Chaozhong Yang
>   Original Estimate: 336h
>  Remaining Estimate: 336h
>
> In Spark SQL, there is a function {{saveAsParquetFile}} in {{DataFrame}} or 
> {{SchemaRDD}}. That function calls method in parquet-mr, and sometimes it 
> will fail due to the OOM error thrown by parquet-mr. We can see the exception 
> stack trace  as follows:
> {noformat}
> WARN] [task-result-getter-3] 03-19 11:17:58,274 [TaskSetManager] - Lost task 
> 0.2 in stage 137.0 (TID 309, hb1.avoscloud.com): java.lang.OutOfMemoryError: 
> Java heap space
>         at parquet.column.values.dictionary.IntList.initSlab(IntList.java:87)
>         at parquet.column.values.dictionary.IntList.<init>(IntList.java:83)
>         at 
> parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:85)
>         at 
> parquet.column.values.dictionary.DictionaryValuesWriter$PlainIntegerDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:549)
>         at 
> parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:88)
>         at 
> parquet.column.impl.ColumnWriterImpl.<init>(ColumnWriterImpl.java:74)
>         at 
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68)
>         at 
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
>         at 
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
>         at 
> parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
>         at 
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
>         at 
> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
>         at 
> parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
>         at 
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
>         at 
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
>         at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:304)
>         at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
>         at 
> org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
>         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>         at 
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
>         at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
>         at java.lang.Thread.run(Thread.java:662)
> {noformat}
> By the way, there is another similar issue 
> https://issues.apache.org/jira/browse/PARQUET-99. But the reporter has closed 
> it and mark it as resolved.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to