This is an automated email from the ASF dual-hosted git repository.
michaelpearce pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/master by this push:
new fafbd7e ARTEMIS-1604 Artemis deadlock using MQTT Protocol
new 201d76b This closes #2580
fafbd7e is described below
commit fafbd7e2e5953e03573088577be620828cd77bc5
Author: Michael André Pearce <[email protected]>
AuthorDate: Tue Mar 12 19:53:07 2019 +0000
ARTEMIS-1604 Artemis deadlock using MQTT Protocol
Address code review comment not address when PR was merged.
---
.../artemis/core/server/impl/QueueImpl.java | 80 ++++++++++++----------
1 file changed, 42 insertions(+), 38 deletions(-)
diff --git
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
index 7e736ca..47cd68e 100644
---
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
+++
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/QueueImpl.java
@@ -3094,60 +3094,64 @@ public class QueueImpl extends CriticalComponentImpl
implements Queue {
return false;
}
try {
- synchronized (this) {
- if (!supportsDirectDeliver) {
- return false;
- }
- if (paused || !canDispatch() && redistributor == null) {
- return false;
- }
-
- if (checkExpired(ref)) {
- return true;
- }
+ return deliver(ref);
+ } finally {
+ deliverLock.unlock();
+ }
+ }
- consumers.reset();
+ private boolean deliver(final MessageReference ref) {
+ synchronized (this) {
+ if (!supportsDirectDeliver) {
+ return false;
+ }
+ if (paused || !canDispatch() && redistributor == null) {
+ return false;
+ }
- while (consumers.hasNext() || redistributor != null) {
+ if (checkExpired(ref)) {
+ return true;
+ }
- ConsumerHolder<? extends Consumer> holder = redistributor ==
null ? consumers.next() : redistributor;
- Consumer consumer = holder.consumer;
+ consumers.reset();
- final SimpleString groupID = extractGroupID(ref);
- Consumer groupConsumer = getGroupConsumer(groupID);
+ while (consumers.hasNext() || redistributor != null) {
- if (groupConsumer != null) {
- consumer = groupConsumer;
- }
+ ConsumerHolder<? extends Consumer> holder = redistributor == null
? consumers.next() : redistributor;
+ Consumer consumer = holder.consumer;
- HandleStatus status = handle(ref, consumer);
+ final SimpleString groupID = extractGroupID(ref);
+ Consumer groupConsumer = getGroupConsumer(groupID);
- if (status == HandleStatus.HANDLED) {
+ if (groupConsumer != null) {
+ consumer = groupConsumer;
+ }
- if (redistributor == null) {
- handleMessageGroup(ref, consumer, groupConsumer, groupID);
- }
+ HandleStatus status = handle(ref, consumer);
- messagesAdded.incrementAndGet();
+ if (status == HandleStatus.HANDLED) {
- deliveriesInTransit.countUp();
- proceedDeliver(consumer, ref);
- consumers.reset();
- return true;
+ if (redistributor == null) {
+ handleMessageGroup(ref, consumer, groupConsumer, groupID);
}
- if (redistributor != null || groupConsumer != null) {
- break;
- }
+ messagesAdded.incrementAndGet();
+
+ deliveriesInTransit.countUp();
+ proceedDeliver(consumer, ref);
+ consumers.reset();
+ return true;
}
- if (logger.isTraceEnabled()) {
- logger.tracef("Queue " + getName() + " is out of direct
delivery as no consumers handled a delivery");
+ if (redistributor != null || groupConsumer != null) {
+ break;
}
- return false;
}
- } finally {
- deliverLock.unlock();
+
+ if (logger.isTraceEnabled()) {
+ logger.tracef("Queue " + getName() + " is out of direct delivery
as no consumers handled a delivery");
+ }
+ return false;
}
}