NO-JIRA Fixing Deadlock on MQTTConnection Found during a testsuite run
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5480d7bc Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5480d7bc Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5480d7bc Branch: refs/heads/master Commit: 5480d7bc5b5dc9ab14995c89a1df8868ada720ce Parents: b5bf5af Author: Clebert Suconic <clebertsuco...@apache.org> Authored: Tue Feb 13 14:40:37 2018 -0500 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Feb 14 10:59:23 2018 -0500 ---------------------------------------------------------------------- .../core/protocol/mqtt/MQTTConnection.java | 44 ++++++++++---------- 1 file changed, 23 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5480d7bc/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java index eb3b4b1..012356b 100644 --- a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java +++ b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java @@ -18,8 +18,8 @@ package org.apache.activemq.artemis.core.protocol.mqtt; import java.util.ArrayList; -import java.util.Collections; import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; @@ -47,9 +47,9 @@ public class MQTTConnection implements RemotingConnection { private String clientID; - private final List<FailureListener> failureListeners = Collections.synchronizedList(new ArrayList<FailureListener>()); + private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<>(); - private final List<CloseListener> closeListeners = Collections.synchronizedList(new ArrayList<CloseListener>()); + private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<>(); public MQTTConnection(Connection transportConnection) throws Exception { this.transportConnection = transportConnection; @@ -100,15 +100,14 @@ public class MQTTConnection implements RemotingConnection { @Override public List<CloseListener> removeCloseListeners() { - synchronized (closeListeners) { - List<CloseListener> deletedCloseListeners = new ArrayList<>(closeListeners); - closeListeners.clear(); - return deletedCloseListeners; - } + List<CloseListener> deletedCloseListeners = copyCloseListeners(); + closeListeners.clear(); + return deletedCloseListeners; } @Override public void setCloseListeners(List<CloseListener> listeners) { + closeListeners.clear(); closeListeners.addAll(listeners); } @@ -119,19 +118,15 @@ public class MQTTConnection implements RemotingConnection { @Override public List<FailureListener> removeFailureListeners() { - synchronized (failureListeners) { - List<FailureListener> deletedFailureListeners = new ArrayList<>(failureListeners); - failureListeners.clear(); - return deletedFailureListeners; - } + List<FailureListener> deletedFailureListeners = copyFailureListeners(); + failureListeners.clear(); + return deletedFailureListeners; } @Override public void setFailureListeners(List<FailureListener> listeners) { - synchronized (failureListeners) { - failureListeners.clear(); - failureListeners.addAll(listeners); - } + failureListeners.clear(); + failureListeners.addAll(listeners); } @Override @@ -141,13 +136,20 @@ public class MQTTConnection implements RemotingConnection { @Override public void fail(ActiveMQException me) { - synchronized (failureListeners) { - for (FailureListener listener : failureListeners) { - listener.connectionFailed(me, false); - } + List<FailureListener> copy = copyFailureListeners(); + for (FailureListener listener : copy) { + listener.connectionFailed(me, false); } } + private List<FailureListener> copyFailureListeners() { + return new ArrayList<>(failureListeners); + } + + private List<CloseListener> copyCloseListeners() { + return new ArrayList<>(closeListeners); + } + @Override public void fail(ActiveMQException me, String scaleDownTargetNodeID) { synchronized (failureListeners) {