devinbost opened a new issue #6413: [pulsar-proxy] Netty resource leak URL: https://github.com/apache/pulsar/issues/6413 I found this resource leak in the logs while debugging this issue: https://github.com/apache/pulsar/issues/6332 ``` 22:02:46.655 [pulsar-client-io-33-1:io.netty.util.ResourceLeakDetector@317] ERROR io.netty.util.ResourceLeakDetector - LEAK: ByteBuf.release() was not called before it's garbage-collected. See https://netty.io/wiki/reference-counted-objects.html for more information. Recent access records: #1: io.netty.buffer.CompositeByteBuf$Component.free(CompositeByteBuf.java:1915) io.netty.buffer.CompositeByteBuf.deallocate(CompositeByteBuf.java:2220) io.netty.buffer.AbstractReferenceCountedByteBuf.handleRelease(AbstractReferenceCountedByteBuf.java:110) io.netty.buffer.AbstractReferenceCountedByteBuf.release(AbstractReferenceCountedByteBuf.java:100) io.netty.util.ReferenceCountUtil.release(ReferenceCountUtil.java:88) io.netty.util.ReferenceCountUtil.safeRelease(ReferenceCountUtil.java:113) io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer0(AbstractEpollChannel.java:337) io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:326) io.netty.channel.epoll.AbstractEpollChannel.newDirectBuffer(AbstractEpollChannel.java:304) io.netty.channel.epoll.AbstractEpollStreamChannel.filterOutboundMessage(AbstractEpollStreamChannel.java:521) io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:874) io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1379) io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:716) io.netty.channel.AbstractChannelHandlerContext.invokeWriteAndFlush(AbstractChannelHandlerContext.java:763) io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:789) io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:757) io.netty.channel.AbstractChannelHandlerContext.writeAndFlush(AbstractChannelHandlerContext.java:812) io.netty.channel.DefaultChannelPipeline.writeAndFlush(DefaultChannelPipeline.java:1037) io.netty.channel.AbstractChannel.writeAndFlush(AbstractChannel.java:293) org.apache.pulsar.proxy.server.ProxyConnection.channelRead(ProxyConnection.java:184) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) org.apache.pulsar.proxy.server.ParserProxyHandler.channelRead(ParserProxyHandler.java:177) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #2: io.netty.buffer.AdvancedLeakAwareByteBuf.readByte(AdvancedLeakAwareByteBuf.java:400) org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.readRawVarint32(ByteBufCodedInputStream.java:268) org.apache.pulsar.common.util.protobuf.ByteBufCodedInputStream.readTag(ByteBufCodedInputStream.java:94) org.apache.pulsar.common.api.proto.PulsarApi$MessageMetadata$Builder.mergeFrom(PulsarApi.java:4620) org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:421) org.apache.pulsar.common.api.raw.MessageParser.parseMessage(MessageParser.java:69) org.apache.pulsar.proxy.server.ParserProxyHandler.channelRead(ParserProxyHandler.java:120) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #3: io.netty.buffer.AdvancedLeakAwareByteBuf.getShort(AdvancedLeakAwareByteBuf.java:166) org.apache.pulsar.common.protocol.Commands.hasChecksum(Commands.java:391) org.apache.pulsar.common.protocol.Commands.skipChecksumIfPresent(Commands.java:405) org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:414) org.apache.pulsar.common.api.raw.MessageParser.parseMessage(MessageParser.java:69) org.apache.pulsar.proxy.server.ParserProxyHandler.channelRead(ParserProxyHandler.java:120) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #4: Hint: 'inboundParser' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:326) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:300) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #5: io.netty.buffer.AdvancedLeakAwareByteBuf.getUnsignedInt(AdvancedLeakAwareByteBuf.java:196) io.netty.handler.codec.LengthFieldBasedFrameDecoder.getUnadjustedFrameLength(LengthFieldBasedFrameDecoder.java:466) io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:408) io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:334) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:503) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:442) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #6: io.netty.buffer.AdvancedLeakAwareByteBuf.order(AdvancedLeakAwareByteBuf.java:70) io.netty.handler.codec.LengthFieldBasedFrameDecoder.getUnadjustedFrameLength(LengthFieldBasedFrameDecoder.java:453) io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:408) io.netty.handler.codec.LengthFieldBasedFrameDecoder.decode(LengthFieldBasedFrameDecoder.java:334) io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:503) io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:442) io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:281) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #7: Hint: 'newFrameDecoder' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:352) io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1422) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:374) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:360) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) #8: Hint: 'DefaultChannelPipeline$HeadContext#0' will handle the message from this point. io.netty.channel.DefaultChannelPipeline.touch(DefaultChannelPipeline.java:116) io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:357) io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:931) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:792) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) Created at: io.netty.buffer.UnpooledByteBufAllocator.newDirectBuffer(UnpooledByteBufAllocator.java:96) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:183) org.apache.bookkeeper.common.allocator.impl.ByteBufAllocatorImpl.newDirectBuffer(ByteBufAllocatorImpl.java:158) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:187) io.netty.buffer.AbstractByteBufAllocator.directBuffer(AbstractByteBufAllocator.java:178) io.netty.channel.unix.PreferredDirectByteBufAllocator.ioBuffer(PreferredDirectByteBufAllocator.java:53) io.netty.channel.DefaultMaxMessagesRecvByteBufAllocator$MaxMessageHandle.allocate(DefaultMaxMessagesRecvByteBufAllocator.java:114) io.netty.channel.epoll.EpollRecvByteAllocatorHandle.allocate(EpollRecvByteAllocatorHandle.java:75) io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:777) io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:502) io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) io.netty.util.concurrent.SingleThreadEventExecutor$6.run(SingleThreadEventExecutor.java:1050) io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) java.lang.Thread.run(Thread.java:748) : 37 leak records were discarded because the leak record count is targeted to 4. Use system property io.netty.leakDetection.targetRecords to increase the limit. ``` I have attached the complete log here: [org.apache.pulsar.proxy.server.ProxyParserTest-output.txt](https://github.com/apache/pulsar/files/4247107/org.apache.pulsar.proxy.server.ProxyParserTest-output.txt) The run is here: https://github.com/apache/pulsar/pull/6202/checks?check_run_id=465789978
---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services
