hangc0276 commented on PR #3919:
URL: https://github.com/apache/bookkeeper/pull/3919#issuecomment-1508220615

   ### How this bug happens on the Pulsar side
   #### Reproduce steps
   1. Add 
`brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor`
 to `conf/broker.conf` to enable broker entry metadata interceptor
   2. Setup a consumer to a topic-A
   3. Setup a producer without batch and produce a small message (10 bytes) to 
topic-A
   4. The broker will throw the following exception
   ```
   2023-04-14T17:06:09,622+0800 [broker-topic-workers-OrderedExecutor-0-0] 
ERROR org.apache.pulsar.common.protocol.Commands - 
[PersistentSubscription{topic=persistent://pulsar/test_v1/127.0.0.1:8080/healthcheck,
 name=healthCheck-d1544425-97ea-4874-8972-d30d2391ee8b}] [-1] Failed to parse 
message metadata
   java.lang.IndexOutOfBoundsException: readerIndex(94) + length(2) exceeds 
writerIndex(94): UnpooledDuplicatedByteBuf(ridx: 94, widx: 94, cap: 94, 
unwrapped: CompositeByteBuf(ridx: 94, widx: 94, cap: 94, components=2))
        at 
io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) 
~[io.netty-netty-buffer-4.1.89.Final.jar:4.1.89.Final]
        at io.netty.buffer.AbstractByteBuf.readShort(AbstractByteBuf.java:749) 
~[io.netty-netty-buffer-4.1.89.Final.jar:4.1.89.Final]
        at 
org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist(Commands.java:1692)
 ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at 
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:452)
 ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at 
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445)
 ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at 
org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1899)
 ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at 
org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1918)
 ~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at 
org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:142)
 ~[org.apache.pulsar-pulsar-broker-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at 
org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:100)
 ~[org.apache.pulsar-pulsar-broker-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
        at 
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:210)
 ~[org.apache
   ```
   
   #### How this bug happens
   1. When producing an entry on the Pulsar topic with 
AppendBrokerTimestampMetadataInterceptor, the broker will add a meta header for 
the entry and generate a CompositeByteBuf
   
https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1685-L1687
   
   2. To isolate the `readerIndex` and `writeIndex` change between the Pulsar 
broker and BookKeeper client, we generate a duplicated ByteBuf for BookieClient 
to write to the BookKeeper cluster. Ideally, the Broker's ByteBuf's readerIndex 
won't change.
   
https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L126-L144
   
   3. However, we unwrapped the duplicated ByteBuf if the original ByteBuf is a 
CompositeByteBuf to improve the crc32c digest for the ByteBuf. If we use the 
wrapped duplicated ByteBuf to calculate the crc32c digest, it will copy the 
CompositeByteBuf data from direct memory to heap memory and bring a heavy JVM 
GC load. 
   
https://github.com/apache/bookkeeper/blob/35e9da9b55b5d44459d3421e8704be47afc6f914/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L141-L142
   
   4. After calculating the crc32c digest, it will write the unwrapped 
CompositeByteBuf to a new ByteBuf if the ByteBuf size is small. The 
`buf.writeBytes(unwrapped)` API will change the unwrapped CompositeByteBuf's 
readerIndex to the end of the ByteBuf.
   
https://github.com/apache/bookkeeper/blob/35e9da9b55b5d44459d3421e8704be47afc6f914/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L162
   
   
https://github.com/netty/netty/blob/24a0ac36ea91d1aee647d738f879ac873892d829/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java#L1086-L1099
   
   5. After this entry writes succeed, the broker will add this entry to the 
Broker's cache due to the topic has active consumers. At this comment, the 
original CompositeByteBuf `data`'s readerIndex has been updated to the end of 
the ByteBuf on Step 4
   
https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L231-L238
   
   6. When the consumer fetches messages from the topic, it will get the entry 
from the Broker's Cache directly. Due to the ByteBuf's readerIndex has been set 
to the end of the ByteBuf, it will throw an IndexOutOfBoundsException when 
parsing metadata from the entry.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to