[
https://issues.apache.org/jira/browse/HADOOP-12619?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15239818#comment-15239818
]
Jeff Faust commented on HADOOP-12619:
-------------------------------------
Observed this behavior in 2.7.1 with
org.apache.hadoop.io.compress.BZip2Codec::createOutputStream(OutputStream) -
the process would eventually exhaust the native memory on the host and crash.
Here's the relevant code:
{code}
@Override
public CompressionOutputStream createOutputStream(OutputStream out)
throws IOException {
return CompressionCodec.Util.
createOutputStreamWithCodecPool(this, conf, out);
}
{code}
createOutputStreamWithCodecPool gets a compressor from the CodecPool, calls
codec.createOutputStream (out, compressor), and then calls
CompressionOutputStream::setTrackedCompressor(compressor) so that the
compressor can be cleaned up later by the CompressionOutputStream
{code}
static CompressionOutputStream createOutputStreamWithCodecPool(
CompressionCodec codec, Configuration conf, OutputStream out)
throws IOException {
Compressor compressor = CodecPool.getCompressor(codec, conf);
CompressionOutputStream stream = null;
try {
stream = codec.createOutputStream(out, compressor);
} finally {
if (stream == null) {
CodecPool.returnCompressor(compressor);
} else {
stream.setTrackedCompressor(compressor);
}
}
return stream;
}
{code}
CompressionOutputStream has a private trackedCompressor attribute that it
returns to the CodecPool on close():
{code}
private Compressor trackedCompressor;
void setTrackedCompressor(Compressor compressor) {
trackedCompressor = compressor;
}
@Override
public void close() throws IOException {
finish();
out.close();
if (trackedCompressor != null) {
CodecPool.returnCompressor(trackedCompressor);
trackedCompressor = null;
}
}
{code}
This would be great, but when
CompressionCodec.Util.createOutputStreamWithCodecPool calls
codec.createOutputStream(out, compressor), the BZip2Codec never actually
creates a CompressionOutputStream. It creates one of two subclasses:
{code}
@Override
public CompressionOutputStream createOutputStream(OutputStream out,
Compressor compressor) throws IOException {
return Bzip2Factory.isNativeBzip2Loaded(conf) ?
new CompressorStream(out, compressor,
conf.getInt("io.file.buffer.size", 4*1024)) :
new BZip2CompressionOutputStream(out);
}
{code}
Each of these subclasses (CompressorStream and BZip2CompressionOutputStream) in
turn overrides the close() method, and it will be one of these two
implementations that will be called when the returned stream is closed.
Neither implementation returns the compressor to the pool, so every time you
ask the CodecPool for a compressor it creates a new one, allocating more native
memory.
One workaround is to deal directly with the CodecPool, and use the
BZip2Codec::createOutputStream method that takes a compressor as a second
argument - and of course to return the compressor to the CodecPool yourself as
soon as you're finished with it.
> Native memory leaks in CompressorStream
> ---------------------------------------
>
> Key: HADOOP-12619
> URL: https://issues.apache.org/jira/browse/HADOOP-12619
> Project: Hadoop Common
> Issue Type: Bug
> Affects Versions: 2.4.0
> Reporter: wangchao
>
> The constructor of org.apache.hadoop.io.compress.CompressorStream requires an
> org.apache.hadoop.io.compress.Compressor object to compress bytes but it
> does not invoke the compressor's finish method when close method are called.
> This may causes the native memory leaks if the compressor is only used by
> this CompressorStream object.
> I found this when set up a flume agent with gzip compression, the native
> memory grows slowly and cannot fall back.
> {code}
> @Override
> public CompressionOutputStream createOutputStream(OutputStream out)
> throws IOException {
> return (ZlibFactory.isNativeZlibLoaded(conf)) ?
> new CompressorStream(out, createCompressor(),
> conf.getInt("io.file.buffer.size",
> 4*1024)) :
> new GzipOutputStream(out);
> }
> @Override
> public Compressor createCompressor() {
> return (ZlibFactory.isNativeZlibLoaded(conf))
> ? new GzipZlibCompressor(conf)
> : null;
> }
> {code}
> The method of CompressorStream is
> {code}
> @Override
> public void close() throws IOException {
> if (!closed) {
> finish();
> out.close();
> closed = true;
> }
> }
> @Override
> public void finish() throws IOException {
> if (!compressor.finished()) {
> compressor.finish();
> while (!compressor.finished()) {
> compress();
> }
> }
> }
> {code}
> No one will end the compressor.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)