michaeljmarshall opened a new pull request #12456:
URL: https://github.com/apache/pulsar/pull/12456


   ### Motivation
   
   @lhotari and I discovered a race condition in the 
`MultiTopicsConsumerImpl<T>` class. The race allows for messages to be 
delivered out of order.
   
   We discovered the bug using the following steps:
   
   1. Create a 100 partition topic.
   2. Produce to the topic at 50k messages per second.
   3. Consume from the topic using a single, exclusive consumer in a single 
thread.
   4. Observe a small percentage of out of order messages (on average 13 in 
100,000).
   
   However, the race can happen with a single message and on a topic with a 
single partition.
   
   The race comes in these two code blocks:
   
   
https://github.com/apache/pulsar/blob/7ad46c8c18bb8365c9a2d1233a6cd58ecd6f541f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L412-L416
   
   
https://github.com/apache/pulsar/blob/7ad46c8c18bb8365c9a2d1233a6cd58ecd6f541f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L294-L301
   
   The first block is executed on the application's calling thread. The second 
block is executed in either the `internalPinnedExecutor` or the topic partition 
consumer's `internalPinnedExecutor` (these threads are not necessarily the 
same). As such, if the two blocks are called at the same time, it is possible 
for `incomingMessages.poll();` to return a `null` result while 
`nextPendingReceive();` also returns a `null` result. If this happens, the next 
state of the `MultiTopicsConsumerImpl` will be to have a single message in the 
`incomingMessages` queue and a single pending receive in the `pendingReceives` 
queue. Then, a message will deliver out of order.
   
   This proposed solution follows the paradigm used by @Vanlightly in 
https://github.com/apache/pulsar/pull/11691. Essentially, the places where we 
need to inspect both the `pendingReceives` and the `incomingMessages` queues 
must be updated from a single thread: `internalPinnedExecutor`.
   
   ### Modifications
   
   * Run the callback for `consumer.receiveAsync()` on the 
`internalPinnedExecutor`. I chose to run the whole callback on the 
`internalPinnedExecutor` instead of just the `messageReceived` method. If we 
left the callback using `thenAccept` and ran `messageReceived` on the 
`internalPinnedExecutor`, there is a chance that the callback will run on the 
calling thread, which is always the `internalPinnedExecutor`. That would mean 
that the `messageReceived` logic would actually run after the remaining 
callback logic that inspects the `incomingMessages.size()` and decides whether 
or not to pause the consumer. By scheduling the callback on the 
`internalPinnedExecutor` using `thenAcceptAsync`, we guarantee that the code is 
run together without the data race we're fixing in this PR.
   * Run `MultiTopicsConsumerImpl#internalReceiveAsync` on the 
`internalPinnedExecutor`.
   * Remove the `checkState(message instanceof TopicMessageImpl);` method call 
from the `MultiTopicsConsumerImpl#internalReceiveAsync` method. This decision 
may be controversial. I removed the check because we only ever add 
`TopicMessageImpl` to the `MultiTopicsConsumerImpl`'s `incomingMessages` queue. 
If it is a necessary check, we could complete the future exceptionally when the 
message is not of type `TopicMessageImpl`.
   
   ### Verifying this change
   
   Since this is a fix for a data race, it is hard to test the change. I think 
the change is small enough that we don't need to add new tests for it, but 
please let me know if you think otherwise.
   
   ### Does this pull request potentially affect one of the following parts:
   
   *If `yes` was chosen, please highlight the changes*
   
   * Dependencies (does it add or upgrade a dependency): (no)
   * The public API: (no)
   * The schema: (no)
   * The default values of configurations: (no)
   * The wire protocol: (no)
   * The rest endpoints: (no)
   * The admin cli options: (no)
   * Anything that affects deployment: (no)
   
   ### Documentation
   We should document this in release notes. No other docs need to be updated.
   
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to