lhotari opened a new pull request, #22760:
URL: https://github.com/apache/pulsar/pull/22760

   Fixes #22601 #21892 #19460
   
   ### Motivation
   
   In Pulsar, there are multiple reported issues where the transferred output 
gets corrupted and fails with exceptions around invalid reader and writer 
index. One source of these issues are the ones which occur only when TLS is 
enabled.
   
   This is due to the fact that Netty's SslHandler used internalNioBuffer 
method on the input ByteBuf instances.
   Sharing the same internalNioBuffer instance across multiple threads is not 
thread safe. 
   
   I found these Netty issues that provide a lot of context:
   * https://github.com/netty/netty/issues/6184
   * https://github.com/netty/netty/issues/2761
   * https://github.com/netty/netty/issues/1865
     * 
https://github.com/netty/netty/commit/a74149e9848fef73d909dd56843e0fbab23f877d
   * https://github.com/netty/netty/issues/1801
     * https://github.com/netty/netty/issues/1797
     * 
https://github.com/netty/netty/commit/6f79291d5bf7c515549234ed93dc00b84ce008c9
   * https://github.com/netty/netty/issues/1925
   
   It seems that this is a long time issue in Netty  and it has been partially 
fixed. However, it's not fixed for many locations in the Netty code base and 
it's not safe to share ByteBuf instances in all cases. 
   
   In Pulsar, the sharing of ByteBuf instance happens in this case at least via 
the broker cache 
([RangeEntryCacheManagerImpl](https://github.com/apache/pulsar/blob/master/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/cache/RangeEntryCacheManagerImpl.java))
 and the pending reads manager 
([PendingReadsManager](org.apache.bookkeeper.mledger.impl.cache.PendingReadsManager)).
   
   The SslHandler related issue was originally reported in Pulsar in 2018 with 
#2401 . The fix that time was #2464.
   The ByteBuf `.copy()` method was used to copy the ByteBuf. The problem with 
this change is that `.copy()` itself isn't thread safe and accesses the 
internalNioBuffer instance directly.
   
   This happens at least when the ByteBuf instance contains a 
ReadOnlyByteBufferBuf wrapper. This can be seen in the code 
https://github.com/netty/netty/blob/243de91df2e9a9bf0ad938f54f76063c14ba6e3d/buffer/src/main/java/io/netty/buffer/ReadOnlyByteBufferBuf.java#L412-L433
 . 
   
   As a result of this, exceptions such as these ones occur:
   ```
   java.lang.IllegalArgumentException: newPosition > limit: (2094 > 88)
       at java.base/java.nio.Buffer.createPositionException(Buffer.java:341)
       at java.base/java.nio.Buffer.position(Buffer.java:316)
       at java.base/java.nio.ByteBuffer.position(ByteBuffer.java:1516)
       at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:185)
       at 
io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
       at 
io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
       at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
       at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
       at 
org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
       at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
       at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
       at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
       at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
       at 
org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
       at 
io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
       at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
       at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
       at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
       at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
       at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
       at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   
   ```
   java.nio.BufferUnderflowException
       at java.base/java.nio.HeapByteBuffer.get(HeapByteBuffer.java:183)
       at 
io.netty.buffer.UnpooledHeapByteBuf.setBytes(UnpooledHeapByteBuf.java:268)
       at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:1113)
       at 
io.netty.buffer.ReadOnlyByteBufferBuf.copy(ReadOnlyByteBufferBuf.java:431)
       at io.netty.buffer.DuplicatedByteBuf.copy(DuplicatedByteBuf.java:210)
       at io.netty.buffer.AbstractByteBuf.copy(AbstractByteBuf.java:1194)
       at 
org.apache.pulsar.common.protocol.ByteBufPair$CopyingEncoder.write(ByteBufPair.java:149)
       at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:893)
       at 
io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:875)
       at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:984)
       at 
io.netty.channel.AbstractChannelHandlerContext.write(AbstractChannelHandlerContext.java:868)
       at 
org.apache.pulsar.broker.service.PulsarCommandSenderImpl.lambda$sendMessagesToConsumer$1(PulsarCommandSenderImpl.java:277)
       at 
io.netty.util.concurrent.AbstractEventExecutor.runTask(AbstractEventExecutor.java:173)
       at 
io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:166)
       at 
io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:470)
       at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:413)
       at 
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
       at 
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
       at 
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
       at java.base/java.lang.Thread.run(Thread.java:840)
   ```
   
   It is likely that `Failed to peek sticky key from the message metadata 
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 4` exceptions 
are also caused by the same root cause.
   `java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 
(expected: 0 <= readerIndex <= writerIndex <= capacity(65536))` type of 
exceptions on the broker side are possibly caused by the same root cause as 
well.
   
   The root cause of such exceptions could also be different. A shared Netty 
ByteBuf must have at least have an independent view created with `duplicate`, 
`slice` or `retainedDuplicate` if the readerIndex is mutated.
   The ByteBuf instance must also be properly shared in a thread safe way. 
Failing to do that could result in similar symptoms and this PR doesn't fix 
that.
   
   ### Modifications
   
   - replace the use of `.copy()` in the ByteBufPair.CopyingEncoder class with 
a shallow copy which uses the thread safe `nioBuffer()` and `nioBuffers()` 
methods.
   
   ### Documentation
   
   <!-- DO NOT REMOVE THIS SECTION. CHECK THE PROPER BOX ONLY. -->
   
   - [ ] `doc` <!-- Your PR contains doc changes. -->
   - [ ] `doc-required` <!-- Your PR changes impact docs and you will update 
later -->
   - [x] `doc-not-needed` <!-- Your PR changes do not impact docs -->
   - [ ] `doc-complete` <!-- Docs have been already added -->


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to