[
https://issues.apache.org/jira/browse/PARQUET-222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14580765#comment-14580765
]
Cheng Lian edited comment on PARQUET-222 at 6/10/15 4:37 PM:
-------------------------------------------------------------
Hey [~rdblue], it seems that you are referring to use cases like writing to
Hive dynamic partitions (where a single task may need to write to write
multiple Parquet files according to partition column values)? I believe the
use case described in the JIRA description is different. Unlike Hive or Pig,
which use process level parallelism, Spark uses thread level parallelism (tasks
are executed in thread pool). And currently, there's no way to "pin" a Spark
task to a specific process. So even each task is guaranteed to write at most
one Parquet file, it's still possible for a single executor process to write
multiple Parquet files at some point. So in the scope of Spark, currently
there isn't a very good mechanism to fix this problem. What I suggested was
essentially to shrink partition number so that on average an executor writes
only a single file (I made a mistake in my previous comment and said "at most
one Parquet file", it should be "on average").
In case of dynamic partitioning, we do plan to re-partition the data according
to partition column values before writing the data to reduce number of parallel
writers. Another possible approach was to sort the data within each task
before writing, so that only a single writer is active for a task at any point
of time.
was (Author: lian cheng):
Hey [~rdblue], it seems that you are referring to use cases like writing to
Hive dynamic partitions (where a single task may need to write to write
multiple Parquet files according to Partition column values)? I believe the
use case described in the JIRA description is different. Unlike Hive or Pig,
which use process level parallelism, Spark uses thread level parallelism (tasks
are executed in thread pool). And currently, there's no way to "pin" a Spark
task to a specific process. So even each task is guaranteed to write at most
one Parquet file, it's still possible for a single executor process to write
multiple Parquet files at some point. So in the scope of Spark, currently
there isn't a very good mechanism to fix this problem. What I suggested was
essentially to shrink partition number so that on average an executor writes
only a single file (I made a mistake in my previous comment and said "at most
one Parquet file", it should be "on average").
In case of dynamic partitioning, we do plan to re-partition the data according
to partition column values before writing the data to reduce number of parallel
writers. Another possible approach was to sort the data within each task
before writing, so that only a single writer is active for a task at any point
of time.
> parquet writer runs into OOM during writing when calling
> DataFrame.saveAsParquetFile in Spark SQL
> -------------------------------------------------------------------------------------------------
>
> Key: PARQUET-222
> URL: https://issues.apache.org/jira/browse/PARQUET-222
> Project: Parquet
> Issue Type: Bug
> Components: parquet-mr
> Affects Versions: 1.6.0
> Reporter: Chaozhong Yang
> Original Estimate: 336h
> Remaining Estimate: 336h
>
> In Spark SQL, there is a function {{saveAsParquetFile}} in {{DataFrame}} or
> {{SchemaRDD}}. That function calls method in parquet-mr, and sometimes it
> will fail due to the OOM error thrown by parquet-mr. We can see the exception
> stack trace as follows:
> {noformat}
> WARN] [task-result-getter-3] 03-19 11:17:58,274 [TaskSetManager] - Lost task
> 0.2 in stage 137.0 (TID 309, hb1.avoscloud.com): java.lang.OutOfMemoryError:
> Java heap space
> at parquet.column.values.dictionary.IntList.initSlab(IntList.java:87)
> at parquet.column.values.dictionary.IntList.<init>(IntList.java:83)
> at
> parquet.column.values.dictionary.DictionaryValuesWriter.<init>(DictionaryValuesWriter.java:85)
> at
> parquet.column.values.dictionary.DictionaryValuesWriter$PlainIntegerDictionaryValuesWriter.<init>(DictionaryValuesWriter.java:549)
> at
> parquet.column.ParquetProperties.getValuesWriter(ParquetProperties.java:88)
> at
> parquet.column.impl.ColumnWriterImpl.<init>(ColumnWriterImpl.java:74)
> at
> parquet.column.impl.ColumnWriteStoreImpl.newMemColumn(ColumnWriteStoreImpl.java:68)
> at
> parquet.column.impl.ColumnWriteStoreImpl.getColumnWriter(ColumnWriteStoreImpl.java:56)
> at
> parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.<init>(MessageColumnIO.java:178)
> at
> parquet.io.MessageColumnIO.getRecordWriter(MessageColumnIO.java:369)
> at
> parquet.hadoop.InternalParquetRecordWriter.initStore(InternalParquetRecordWriter.java:108)
> at
> parquet.hadoop.InternalParquetRecordWriter.<init>(InternalParquetRecordWriter.java:94)
> at
> parquet.hadoop.ParquetRecordWriter.<init>(ParquetRecordWriter.java:64)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:282)
> at
> parquet.hadoop.ParquetOutputFormat.getRecordWriter(ParquetOutputFormat.java:252)
> at
> org.apache.spark.sql.parquet.InsertIntoParquetTable.org$apache$spark$sql$parquet$InsertIntoParquetTable$$writeShard$1(ParquetTableOperations.scala:304)
> at
> org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at
> org.apache.spark.sql.parquet.InsertIntoParquetTable$$anonfun$saveAsHadoopFile$1.apply(ParquetTableOperations.scala:325)
> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
> at org.apache.spark.scheduler.Task.run(Task.scala:56)
> at
> org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
> at java.lang.Thread.run(Thread.java:662)
> {noformat}
> By the way, there is another similar issue
> https://issues.apache.org/jira/browse/PARQUET-99. But the reporter has closed
> it and mark it as resolved.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)