[ 
https://issues.apache.org/jira/browse/HADOOP-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15239818#comment-15239818
 ] 

Jeff Faust commented on HADOOP-12619:
-------------------------------------

Observed this behavior in 2.7.1 with 
org.apache.hadoop.io.compress.BZip2Codec::createOutputStream(OutputStream) - 
the process would eventually exhaust the native memory on the host and crash.  
Here's the relevant code:
{code}
@Override
  public CompressionOutputStream createOutputStream(OutputStream out)
      throws IOException {
    return CompressionCodec.Util.
        createOutputStreamWithCodecPool(this, conf, out);
  }
{code}
createOutputStreamWithCodecPool gets a compressor from the CodecPool, calls 
codec.createOutputStream (out, compressor), and then calls 
CompressionOutputStream::setTrackedCompressor(compressor) so that the 
compressor can be cleaned up later by the CompressionOutputStream  
{code}
static CompressionOutputStream createOutputStreamWithCodecPool(
        CompressionCodec codec, Configuration conf, OutputStream out)
        throws IOException {
      Compressor compressor = CodecPool.getCompressor(codec, conf);
      CompressionOutputStream stream = null;
      try {
        stream = codec.createOutputStream(out, compressor);
      } finally {
        if (stream == null) {
          CodecPool.returnCompressor(compressor);
        } else {
          stream.setTrackedCompressor(compressor);
        }
      }
      return stream;
    }
{code}
CompressionOutputStream has a private trackedCompressor attribute that it 
returns to the CodecPool on close():
{code}
   private Compressor trackedCompressor;
   
   void setTrackedCompressor(Compressor compressor) {
    trackedCompressor = compressor;
  }

    @Override
  public void close() throws IOException {
    finish();
    out.close();
    if (trackedCompressor != null) {
      CodecPool.returnCompressor(trackedCompressor);
      trackedCompressor = null;
    }
  }
{code}
This would be great, but when 
CompressionCodec.Util.createOutputStreamWithCodecPool calls 
codec.createOutputStream(out, compressor), the BZip2Codec never actually 
creates a CompressionOutputStream.  It creates one of two subclasses:
{code}
 @Override
  public CompressionOutputStream createOutputStream(OutputStream out,
      Compressor compressor) throws IOException {
    return Bzip2Factory.isNativeBzip2Loaded(conf) ?
      new CompressorStream(out, compressor, 
                           conf.getInt("io.file.buffer.size", 4*1024)) :
      new BZip2CompressionOutputStream(out);
  }
{code}
Each of these subclasses (CompressorStream and BZip2CompressionOutputStream) in 
turn overrides the close() method, and it will be one of these two 
implementations that will be called when the returned stream is closed.  
Neither implementation returns the compressor to the pool, so every time you 
ask the CodecPool for a compressor it creates a new one, allocating more native 
memory.  

One workaround is to deal directly with the CodecPool, and use the 
BZip2Codec::createOutputStream method that takes a compressor as a second 
argument - and of course to return the compressor to the CodecPool yourself as 
soon as you're finished with it.

> Native memory leaks in CompressorStream
> ---------------------------------------
>
>                 Key: HADOOP-12619
>                 URL: https://issues.apache.org/jira/browse/HADOOP-12619
>             Project: Hadoop Common
>          Issue Type: Bug
>    Affects Versions: 2.4.0
>            Reporter: wangchao
>
> The constructor of org.apache.hadoop.io.compress.CompressorStream requires an 
> org.apache.hadoop.io.compress.Compressor  object to compress bytes but it 
> does not invoke the compressor's finish method when close method are called. 
> This may causes the native memory leaks if the compressor is only used by 
> this CompressorStream object.
> I found this when set up a flume agent with gzip compression, the native 
> memory grows slowly and cannot fall back. 
> {code}
>   @Override
>   public CompressionOutputStream createOutputStream(OutputStream out) 
>     throws IOException {
>     return (ZlibFactory.isNativeZlibLoaded(conf)) ?
>                new CompressorStream(out, createCompressor(),
>                                     conf.getInt("io.file.buffer.size", 
> 4*1024)) :
>                new GzipOutputStream(out);
>   }
>   @Override
>   public Compressor createCompressor() {
>     return (ZlibFactory.isNativeZlibLoaded(conf))
>       ? new GzipZlibCompressor(conf)
>       : null;
>   }
> {code}
> The method of CompressorStream is
> {code}
>   @Override
>   public void close() throws IOException {
>     if (!closed) {
>       finish();
>       out.close();
>       closed = true;
>     }
>   }
>   @Override
>   public void finish() throws IOException {
>     if (!compressor.finished()) {
>       compressor.finish();
>       while (!compressor.finished()) {
>         compress();
>       }
>     }
>   }
> {code}
> No one will end the compressor.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to