equanz opened a new issue, #20844: URL: https://github.com/apache/pulsar/issues/20844
### Search before asking - [X] I searched in the [issues](https://github.com/apache/pulsar/issues) and found nothing similar. ### Version * OS: macOS 13.4.1 * Pulsar: current master https://github.com/apache/pulsar/commit/f5553a286d5a7795771c79f91f9ae8a412cf4683 ### Minimal reproduce step 1. Initially, add `Thread.sleep` to reproduce to stuck `PersistentStickyKeyDispatcherMultipleConsumers#addConsumer`. ```diff % git --no-pager diff pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java index 8f05530f58b..87a496040d0 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java @@ -121,6 +121,11 @@ public class PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi return result; }) ).thenRun(() -> { + try { + Thread.sleep(30000); + } catch (Exception e) { + log.error("Exception", e); + } synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this) { PositionImpl readPositionWhenJoining = (PositionImpl) cursor.getReadPosition(); consumer.setReadPositionWhenJoining(readPositionWhenJoining); ``` 2. Connect a consumer (Key_Shared) and wait for completion. 3. Connect a consumer (Key_Shared) to the same subscription. - Don't wait for completion. 4. Send two messages on the same topic concurrently. - Messages must be sent by independent Send operation (e.g. use `Producer#send` ). ### What did you expect to see? These messages are persisted on the topic without waiting for the completion of the connecting new consumer. ### What did you see instead? The 2nd message is persisted on the topic after connecting the new consumer was completed. ### Anything else? #### Root cause Writing new entry and reading entries use the same thread via OrderedExecutor. #### Detailed descriptions While persisting, call `ManagedLedgerImpl#asyncAddEntry` to execute the async operation by `ManagedLedgerImpl#executor`. This executor instance is `SingleThreadExecutor`. https://github.com/apache/pulsar/blob/f5553a286d5a7795771c79f91f9ae8a412cf4683/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java#L776-L779 Try to call `PersistentDispatcherMultipleConsumers#readEntriesComplete` from the same thread in a short time after above. https://github.com/apache/pulsar/blob/f5553a286d5a7795771c79f91f9ae8a412cf4683/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L544 However, can't call this method immediately because this and `PersistentDispatcherMultipleConsumers#addConsumer` are synchronized. So, this method must wait for the completion of `PersistentDispatcherMultipleConsumers#addConsumer`. Therefore, when trying to persist 2nd message at the same time, its operation must wait for the completion of above to take the thread. #### Notes I think it is an unexpected behavior, but it is not a critical issue. ### Are you willing to submit a PR? - [ ] I'm willing to submit a PR! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
