hangc0276 commented on PR #3919:
URL: https://github.com/apache/bookkeeper/pull/3919#issuecomment-1508220615
### How this bug happens on the Pulsar side
#### Reproduce steps
1. Add
`brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendBrokerTimestampMetadataInterceptor`
to `conf/broker.conf` to enable broker entry metadata interceptor
2. Setup a consumer to a topic-A
3. Setup a producer without batch and produce a small message (10 bytes) to
topic-A
4. The broker will throw the following exception
```
2023-04-14T17:06:09,622+0800 [broker-topic-workers-OrderedExecutor-0-0]
ERROR org.apache.pulsar.common.protocol.Commands -
[PersistentSubscription{topic=persistent://pulsar/test_v1/127.0.0.1:8080/healthcheck,
name=healthCheck-d1544425-97ea-4874-8972-d30d2391ee8b}] [-1] Failed to parse
message metadata
java.lang.IndexOutOfBoundsException: readerIndex(94) + length(2) exceeds
writerIndex(94): UnpooledDuplicatedByteBuf(ridx: 94, widx: 94, cap: 94,
unwrapped: CompositeByteBuf(ridx: 94, widx: 94, cap: 94, components=2))
at
io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442)
~[io.netty-netty-buffer-4.1.89.Final.jar:4.1.89.Final]
at io.netty.buffer.AbstractByteBuf.readShort(AbstractByteBuf.java:749)
~[io.netty-netty-buffer-4.1.89.Final.jar:4.1.89.Final]
at
org.apache.pulsar.common.protocol.Commands.skipBrokerEntryMetadataIfExist(Commands.java:1692)
~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
at
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:452)
~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
at
org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445)
~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
at
org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1899)
~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
at
org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1918)
~[org.apache.pulsar-pulsar-common-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
at
org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:142)
~[org.apache.pulsar-pulsar-broker-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
at
org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:100)
~[org.apache.pulsar-pulsar-broker-3.0.0-SNAPSHOT.jar:3.0.0-SNAPSHOT]
at
org.apache.pulsar.broker.service.persistent.PersistentDispatcherSingleActiveConsumer.internalReadEntriesComplete(PersistentDispatcherSingleActiveConsumer.java:210)
~[org.apache
```
#### How this bug happens
1. When producing an entry on the Pulsar topic with
AppendBrokerTimestampMetadataInterceptor, the broker will add a meta header for
the entry and generate a CompositeByteBuf
https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1685-L1687
2. To isolate the `readerIndex` and `writeIndex` change between the Pulsar
broker and BookKeeper client, we generate a duplicated ByteBuf for BookieClient
to write to the BookKeeper cluster. Ideally, the Broker's ByteBuf's readerIndex
won't change.
https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L126-L144
3. However, we unwrapped the duplicated ByteBuf if the original ByteBuf is a
CompositeByteBuf to improve the crc32c digest for the ByteBuf. If we use the
wrapped duplicated ByteBuf to calculate the crc32c digest, it will copy the
CompositeByteBuf data from direct memory to heap memory and bring a heavy JVM
GC load.
https://github.com/apache/bookkeeper/blob/35e9da9b55b5d44459d3421e8704be47afc6f914/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L141-L142
4. After calculating the crc32c digest, it will write the unwrapped
CompositeByteBuf to a new ByteBuf if the ByteBuf size is small. The
`buf.writeBytes(unwrapped)` API will change the unwrapped CompositeByteBuf's
readerIndex to the end of the ByteBuf.
https://github.com/apache/bookkeeper/blob/35e9da9b55b5d44459d3421e8704be47afc6f914/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L162
https://github.com/netty/netty/blob/24a0ac36ea91d1aee647d738f879ac873892d829/buffer/src/main/java/io/netty/buffer/AbstractByteBuf.java#L1086-L1099
5. After this entry writes succeed, the broker will add this entry to the
Broker's cache due to the topic has active consumers. At this comment, the
original CompositeByteBuf `data`'s readerIndex has been updated to the end of
the ByteBuf on Step 4
https://github.com/apache/pulsar/blob/091ee2504ffbe6ec98e354b76e7f4c045e1914aa/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/OpAddEntry.java#L231-L238
6. When the consumer fetches messages from the topic, it will get the entry
from the Broker's Cache directly. Due to the ByteBuf's readerIndex has been set
to the end of the ByteBuf, it will throw an IndexOutOfBoundsException when
parsing metadata from the entry.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]