[
https://issues.apache.org/jira/browse/SPARK-4073?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15111148#comment-15111148
]
Juliet Hougland commented on SPARK-4073:
----------------------------------------
I have run in to a related problem. I am reading a snappy compressed parquet
file via pyspark and get the follow OOM error:
16/01/21 11:43:10 ERROR util.SparkUncaughtExceptionHandler: Uncaught exception
in thread Thread[stdout writer for /opt/anaconda2/bin/python,5,main]
java.lang.OutOfMemoryError: Direct buffer memory
at java.nio.Bits.reserveMemory(Bits.java:658)
at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
at
parquet.hadoop.codec.SnappyDecompressor.setInput(SnappyDecompressor.java:102)
at
parquet.hadoop.codec.NonBlockedDecompressorStream.read(NonBlockedDecompressorStream.java:46)
at java.io.DataInputStream.readFully(DataInputStream.java:195)
at java.io.DataInputStream.readFully(DataInputStream.java:169)
at
parquet.bytes.BytesInput$StreamBytesInput.toByteArray(BytesInput.java:204)
at
parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:89)
at
parquet.column.values.dictionary.PlainValuesDictionary$PlainBinaryDictionary.<init>(PlainValuesDictionary.java:72)
at parquet.column.Encoding$1.initDictionary(Encoding.java:89)
at parquet.column.Encoding$4.initDictionary(Encoding.java:148)
at
parquet.column.impl.ColumnReaderImpl.<init>(ColumnReaderImpl.java:337)
at
parquet.column.impl.ColumnReadStoreImpl.newMemColumnReader(ColumnReadStoreImpl.java:66)
at
parquet.column.impl.ColumnReadStoreImpl.getColumnReader(ColumnReadStoreImpl.java:61)
at
parquet.io.RecordReaderImplementation.<init>(RecordReaderImplementation.java:270)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:134)
at parquet.io.MessageColumnIO$1.visit(MessageColumnIO.java:99)
at
parquet.filter2.compat.FilterCompat$NoOpFilter.accept(FilterCompat.java:154)
at parquet.io.MessageColumnIO.getRecordReader(MessageColumnIO.java:99)
at
parquet.hadoop.InternalParquetRecordReader.checkRead(InternalParquetRecordReader.java:135)
at
parquet.hadoop.InternalParquetRecordReader.nextKeyValue(InternalParquetRecordReader.java:205)
at
parquet.hadoop.ParquetRecordReader.nextKeyValue(ParquetRecordReader.java:201)
at
org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:143)
at
org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:39)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$14.hasNext(Iterator.scala:388)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.hasNext(SerDeUtil.scala:119)
at scala.collection.Iterator$class.foreach(Iterator.scala:727)
at
org.apache.spark.api.python.SerDeUtil$AutoBatchedPickler.foreach(SerDeUtil.scala:114)
at
org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:405)
at
org.apache.spark.api.python.PythonRDD$WriterThread$$anonfun$run$1.apply(PythonRDD.scala:243)
at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1617)
at
org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:205)
> Parquet+Snappy can cause significant off-heap memory usage
> ----------------------------------------------------------
>
> Key: SPARK-4073
> URL: https://issues.apache.org/jira/browse/SPARK-4073
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 1.1.0
> Reporter: Patrick Wendell
> Priority: Critical
>
> The parquet snappy codec allocates off-heap buffers for decompression[1]. In
> one cases the observed size of these buffers was high enough to add several
> GB of data to the overall virtual memory usage of the Spark executor process.
> I don't understand enough about our use of Snappy to fully grok how much data
> we would _expect_ to be present in these buffers at any given time, but I can
> say a few things.
> 1. The dataset had individual rows that were fairly large, e.g. megabytes.
> 2. Direct buffers are not cleaned up until GC events, and overall there was
> not much heap contention. So maybe they just weren't being cleaned.
> I opened PARQUET-118 to see if they can provide an option to use on-heap
> buffers for decompression. In the mean time, we could consider changing the
> default back to gzip, or we could do nothing (not sure how many other users
> will hit this).
> [1]
> https://github.com/apache/incubator-parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java#L28
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]