[
https://issues.apache.org/jira/browse/PARQUET-118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030817#comment-16030817
]
SuYan commented on PARQUET-118:
-------------------------------
We meet spark + parquet-hadoop consumes a lot of unexpected off-heap... it is
because the DirectByteBuffer which was be PhantomReference will not be cleaned
even after a many many Full GC, I have no idea about why not release after Full
GC... after we call " DirectBuffer.cleaner.clean" directly, the off-heap
usage became normal...
the related code:
SnappyCode.java
{code}
@Override
public synchronized void setInput(byte[] buffer, int off, int len) {
SnappyUtil.validateBuffer(buffer, off, len);
if (inputBuffer.capacity() - inputBuffer.position() < len) {
ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
inputBuffer.rewind();
newBuffer.put(inputBuffer);
inputBuffer = newBuffer;
// add oldbuffer.cleaner.clean() here
} else {
inputBuffer.limit(inputBuffer.position() + len);
}
inputBuffer.put(buffer, off, len);
}
{code}
{code}
public synchronized int decompress(byte[] buffer, int off, int len) throws
IOException {
SnappyUtil.validateBuffer(buffer, off, len);
if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
return 0;
}
if (!outputBuffer.hasRemaining()) {
inputBuffer.rewind();
Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of
0.");
Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid position of
0.");
// There is compressed input, decompress it now.
int decompressedSize = Snappy.uncompressedLength(inputBuffer);
if (decompressedSize > outputBuffer.capacity()) {
outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
// add oldBuffer.cleaner.clean here...
}
// Reset the previous outputBuffer (i.e. set position to 0)
outputBuffer.clear();
int size = Snappy.uncompress(inputBuffer, outputBuffer);
{code}
what's more, we set the BUFFER_SIZE_CONFIG(io.file.buffer.size)=32K, and the
sequence of compressed data size is like: 32K, 250K, 10MB, 32MB, 64MB....
so the inputBuffer of SnappyDecompressor need increase the size when meet a
input data larger than current SnappyDecompressor inputBuffer, but it increase
step is 32K...so it will called setInput() many times, and then
allocateDirectByteBuffer manyTimes....and a lot of PhantomReference
DirectBuffer....
It may be better to have a adaptive strategy to increase or decrease
SnappyDecompressor inputBuffer/outputBuffer....
> Provide option to use on-heap buffers for Snappy compression/decompression
> --------------------------------------------------------------------------
>
> Key: PARQUET-118
> URL: https://issues.apache.org/jira/browse/PARQUET-118
> Project: Parquet
> Issue Type: Improvement
> Components: parquet-mr
> Affects Versions: 1.6.0
> Reporter: Patrick Wendell
>
> The current code uses direct off-heap buffers for decompression. If many
> decompressors are instantiated across multiple threads, and/or the objects
> being decompressed are large, this can lead to a huge amount of off-heap
> allocation by the JVM. This can be exacerbated if overall, there is not heap
> contention, since no GC will be performed to reclaim the space used by these
> buffers.
> It would be nice if there was a flag we cold use to simply allocate on-heap
> buffers here:
> https://github.com/apache/incubator-parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java#L28
> We ran into an issue today where these buffers totaled a very large amount of
> storage and caused our Java processes (running within containers) to be
> terminated by the kernel OOM-killer.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)