[ 
https://issues.apache.org/jira/browse/PARQUET-118?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16030817#comment-16030817
 ] 

SuYan commented on PARQUET-118:
-------------------------------

We meet spark + parquet-hadoop consumes a lot of unexpected off-heap... it is 
because the DirectByteBuffer which was be PhantomReference will not be cleaned 
even after a many many Full GC, I have no idea about why not release after Full 
GC...   after we call " DirectBuffer.cleaner.clean" directly, the off-heap 
usage became normal... 

the related code:

SnappyCode.java

{code}
 @Override
public synchronized void setInput(byte[] buffer, int off, int len) {
SnappyUtil.validateBuffer(buffer, off, len);
if (inputBuffer.capacity() - inputBuffer.position() < len) {
ByteBuffer newBuffer = ByteBuffer.allocateDirect(inputBuffer.position() + len);
inputBuffer.rewind();
newBuffer.put(inputBuffer);
inputBuffer = newBuffer;
// add oldbuffer.cleaner.clean() here
} else {
inputBuffer.limit(inputBuffer.position() + len);
}
inputBuffer.put(buffer, off, len);
}
{code}


{code}
   public synchronized int decompress(byte[] buffer, int off, int len) throws 
IOException {
SnappyUtil.validateBuffer(buffer, off, len);
if (inputBuffer.position() == 0 && !outputBuffer.hasRemaining()) {
return 0;
}
if (!outputBuffer.hasRemaining()) {
inputBuffer.rewind();
Preconditions.checkArgument(inputBuffer.position() == 0, "Invalid position of 
0.");
Preconditions.checkArgument(outputBuffer.position() == 0, "Invalid position of 
0.");
// There is compressed input, decompress it now.
int decompressedSize = Snappy.uncompressedLength(inputBuffer);
if (decompressedSize > outputBuffer.capacity()) {
outputBuffer = ByteBuffer.allocateDirect(decompressedSize);
// add oldBuffer.cleaner.clean here...
}
// Reset the previous outputBuffer (i.e. set position to 0)
outputBuffer.clear();
int size = Snappy.uncompress(inputBuffer, outputBuffer);
{code}


what's more,  we set the BUFFER_SIZE_CONFIG(io.file.buffer.size)=32K, and the 
sequence of  compressed data size is like: 32K, 250K, 10MB, 32MB, 64MB....

so the inputBuffer of SnappyDecompressor need increase the size when meet a 
input data larger than current SnappyDecompressor inputBuffer, but it increase 
step is 32K...so it will called setInput() many times, and then 
allocateDirectByteBuffer manyTimes....and a lot of PhantomReference 
DirectBuffer.... 

It may be better to have a adaptive strategy to increase or decrease 
SnappyDecompressor inputBuffer/outputBuffer....


> Provide option to use on-heap buffers for Snappy compression/decompression
> --------------------------------------------------------------------------
>
>                 Key: PARQUET-118
>                 URL: https://issues.apache.org/jira/browse/PARQUET-118
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>    Affects Versions: 1.6.0
>            Reporter: Patrick Wendell
>
> The current code uses direct off-heap buffers for decompression. If many 
> decompressors are instantiated across multiple threads, and/or the objects 
> being decompressed are large, this can lead to a huge amount of off-heap 
> allocation by the JVM. This can be exacerbated if overall, there is not heap 
> contention, since no GC will be performed to reclaim the space used by these 
> buffers.
> It would be nice if there was a flag we cold use to simply allocate on-heap 
> buffers here:
> https://github.com/apache/incubator-parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java#L28
> We ran into an issue today where these buffers totaled a very large amount of 
> storage and caused our Java processes (running within containers) to be 
> terminated by the kernel OOM-killer.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to