lhotari opened a new pull request, #22760: URL: https://github.com/apache/pulsar/pull/22760
Fixes #22601 #21892 #19460 ### Motivation In Pulsar, there are multiple reported issues where the transferred output gets corrupted and fails with exceptions around invalid reader and writer index. One source of these issues are the ones which occur only when TLS is enabled. This is due to the fact that Netty's SslHandler used internalNioBuffer method on the input ByteBuf instances. Sharing the same internalNioBuffer instance across multiple threads is not thread safe. I found these Netty issues that provide a lot of context: * https://github.com/netty/netty/issues/6184 * https://github.com/netty/netty/issues/2761 * https://github.com/netty/netty/issues/1865 * https://github.com/netty/netty/commit/a74149e9848fef73d909dd56843e0fbab23f877d * https://github.com/netty/netty/issues/1801 * https://github.com/netty/netty/issues/1797 * https://github.com/netty/netty/commit/6f79291d5bf7c515549234ed93dc00b84ce008c9 * https://github.com/netty/netty/issues/1925 It seems that this is a long time issue in Netty and it has been partially fixed. However, it's not fixed for many locations in the Netty code base and it's not safe to share ByteBuf instances in all cases. In Pulsar, the sharing of ByteBuf instance happens in this case at least via the broker cache ([RangeEntryCacheManagerImpl](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java)) and the pending reads manager ([PendingReadsManager](org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager)). The SslHandler related issue was originally reported in Pulsar in 2018 with #2401 . The fix that time was #2464. The ByteBuf `.copy()` method was used to copy the ByteBuf. The problem with this change is that `.copy()` itself isn't thread safe and accesses the internalNioBuffer instance directly. This happens at least when the ByteBuf instance contains a ReadOnlyByteBufferBuf wrapper. This can be seen in the code https://github.com/netty/netty/blob/243de91df2e9a9bf0ad938f54f76063c14ba6e3d/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBufferBuf.java#L412-L433 . As a result of this, exceptions such as these ones occur: ``` java.lang.IllegalArgumentException: newPosition > limit: (2094 > 88) at java.base/java.nio.Buffer.createPositionException(Buffer.java:341) at java.base/java.nio.Buffer.position(Buffer.java:316) at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1516) at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:185) at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113) at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431) at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210) at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194) at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868) at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277) at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:840) ``` ``` java.nio.BufferUnderflowException at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:183) at io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268) at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113) at io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431) at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210) at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194) at org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893) at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984) at io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868) at org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277) at io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173) at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166) at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470) at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:840) ``` It is likely that `Failed to peek sticky key from the message metadata java.lang.IllegalArgumentException: Invalid unknonwn tag type: 4` exceptions are also caused by the same root cause. `java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))` type of exceptions on the broker side are possibly caused by the same root cause as well. The root cause of such exceptions could also be different. A shared Netty ByteBuf must have at least have an independent view created with `duplicate`, `slice` or `retainedDuplicate` if the readerIndex is mutated. The ByteBuf instance must also be properly shared in a thread safe way. Failing to do that could result in similar symptoms and this PR doesn't fix that. ### Modifications - replace the use of `.copy()` in the ByteBufPair.CopyingEncoder class with a shallow copy which uses the thread safe `nioBuffer()` and `nioBuffers()` methods. ### Documentation <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. --> - [ ] `doc` <!-- Your PR contains doc changes. --> - [ ] `doc-required` <!-- Your PR changes impact docs and you will update later --> - [x] `doc-not-needed` <!-- Your PR changes do not impact docs --> - [ ] `doc-complete` <!-- Docs have been already added --> -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
