This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.10 by this push:
new aeec16f65df Fixed deadlock in key-shared dispatcher (#16660)
aeec16f65df is described below
commit aeec16f65df603e06ee8a7a4f5aacba54d5096b4
Author: Matteo Merli <[email protected]>
AuthorDate: Mon Jul 18 17:43:17 2022 -0700
Fixed deadlock in key-shared dispatcher (#16660)
---
...rsistentStickyKeyDispatcherMultipleConsumers.java | 20 +++++++++++++-------
1 file changed, 13 insertions(+), 7 deletions(-)
diff --git
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
index 3e01531fc3e..0a402d8322a 100644
---
a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
+++
b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentStickyKeyDispatcherMultipleConsumers.java
@@ -403,13 +403,19 @@ public class
PersistentStickyKeyDispatcherMultipleConsumers extends PersistentDi
}
@Override
- public synchronized void markDeletePositionMoveForward() {
- if (recentlyJoinedConsumers != null &&
!recentlyJoinedConsumers.isEmpty()
- && removeConsumersFromRecentJoinedConsumers()) {
- // After we process acks, we need to check whether the mark-delete
position was advanced and we can finally
- // read more messages. It's safe to call readMoreEntries()
multiple times.
- readMoreEntries();
- }
+ public void markDeletePositionMoveForward() {
+ // Execute the notification in different thread to avoid a mutex chain
here
+ // from the delete operation that was completed
+ topic.getBrokerService().getTopicOrderedExecutor().execute(() -> {
+ synchronized (PersistentStickyKeyDispatcherMultipleConsumers.this)
{
+ if (recentlyJoinedConsumers != null &&
!recentlyJoinedConsumers.isEmpty()
+ && removeConsumersFromRecentJoinedConsumers()) {
+ // After we process acks, we need to check whether the
mark-delete position was advanced and we
+ // can finally read more messages. It's safe to call
readMoreEntries() multiple times.
+ readMoreEntries();
+ }
+ }
+ });
}
private boolean removeConsumersFromRecentJoinedConsumers() {