michaeljmarshall opened a new pull request #12456: URL: https://github.com/apache/pulsar/pull/12456
### Motivation @lhotari and I discovered a race condition in the `MultiTopicsConsumerImpl<T>` class. The race allows for messages to be delivered out of order. We discovered the bug using the following steps: 1. Create a 100 partition topic. 2. Produce to the topic at 50k messages per second. 3. Consume from the topic using a single, exclusive consumer in a single thread. 4. Observe a small percentage of out of order messages (on average 13 in 100,000). However, the race can happen with a single message and on a topic with a single partition. The race comes in these two code blocks: https://github.com/apache/pulsar/blob/7ad46c8c18bb8365c9a2d1233a6cd58ecd6f541f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L412-L416 https://github.com/apache/pulsar/blob/7ad46c8c18bb8365c9a2d1233a6cd58ecd6f541f/pulsar-client/src/main/java/org/apache/pulsar/client/impl/MultiTopicsConsumerImpl.java#L294-L301 The first block is executed on the application's calling thread. The second block is executed in either the `internalPinnedExecutor` or the topic partition consumer's `internalPinnedExecutor` (these threads are not necessarily the same). As such, if the two blocks are called at the same time, it is possible for `incomingMessages.poll();` to return a `null` result while `nextPendingReceive();` also returns a `null` result. If this happens, the next state of the `MultiTopicsConsumerImpl` will be to have a single message in the `incomingMessages` queue and a single pending receive in the `pendingReceives` queue. Then, a message will deliver out of order. This proposed solution follows the paradigm used by @Vanlightly in https://github.com/apache/pulsar/pull/11691. Essentially, the places where we need to inspect both the `pendingReceives` and the `incomingMessages` queues must be updated from a single thread: `internalPinnedExecutor`. ### Modifications * Run the callback for `consumer.receiveAsync()` on the `internalPinnedExecutor`. I chose to run the whole callback on the `internalPinnedExecutor` instead of just the `messageReceived` method. If we left the callback using `thenAccept` and ran `messageReceived` on the `internalPinnedExecutor`, there is a chance that the callback will run on the calling thread, which is always the `internalPinnedExecutor`. That would mean that the `messageReceived` logic would actually run after the remaining callback logic that inspects the `incomingMessages.size()` and decides whether or not to pause the consumer. By scheduling the callback on the `internalPinnedExecutor` using `thenAcceptAsync`, we guarantee that the code is run together without the data race we're fixing in this PR. * Run `MultiTopicsConsumerImpl#internalReceiveAsync` on the `internalPinnedExecutor`. * Remove the `checkState(message instanceof TopicMessageImpl);` method call from the `MultiTopicsConsumerImpl#internalReceiveAsync` method. This decision may be controversial. I removed the check because we only ever add `TopicMessageImpl` to the `MultiTopicsConsumerImpl`'s `incomingMessages` queue. If it is a necessary check, we could complete the future exceptionally when the message is not of type `TopicMessageImpl`. ### Verifying this change Since this is a fix for a data race, it is hard to test the change. I think the change is small enough that we don't need to add new tests for it, but please let me know if you think otherwise. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* * Dependencies (does it add or upgrade a dependency): (no) * The public API: (no) * The schema: (no) * The default values of configurations: (no) * The wire protocol: (no) * The rest endpoints: (no) * The admin cli options: (no) * Anything that affects deployment: (no) ### Documentation We should document this in release notes. No other docs need to be updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
