NO-JIRA Fixing Deadlock on MQTTConnection

Found during a testsuite run


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/5480d7bc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/5480d7bc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/5480d7bc

Branch: refs/heads/master
Commit: 5480d7bc5b5dc9ab14995c89a1df8868ada720ce
Parents: b5bf5af
Author: Clebert Suconic <clebertsuco...@apache.org>
Authored: Tue Feb 13 14:40:37 2018 -0500
Committer: Clebert Suconic <clebertsuco...@apache.org>
Committed: Wed Feb 14 10:59:23 2018 -0500

----------------------------------------------------------------------
 .../core/protocol/mqtt/MQTTConnection.java      | 44 ++++++++++----------
 1 file changed, 23 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/5480d7bc/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
index eb3b4b1..012356b 100644
--- 
a/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
+++ 
b/artemis-protocols/artemis-mqtt-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/mqtt/MQTTConnection.java
@@ -18,8 +18,8 @@
 package org.apache.activemq.artemis.core.protocol.mqtt;
 
 import java.util.ArrayList;
-import java.util.Collections;
 import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
@@ -47,9 +47,9 @@ public class MQTTConnection implements RemotingConnection {
 
    private String clientID;
 
-   private final List<FailureListener> failureListeners = 
Collections.synchronizedList(new ArrayList<FailureListener>());
+   private final List<FailureListener> failureListeners = new 
CopyOnWriteArrayList<>();
 
-   private final List<CloseListener> closeListeners = 
Collections.synchronizedList(new ArrayList<CloseListener>());
+   private final List<CloseListener> closeListeners = new 
CopyOnWriteArrayList<>();
 
    public MQTTConnection(Connection transportConnection) throws Exception {
       this.transportConnection = transportConnection;
@@ -100,15 +100,14 @@ public class MQTTConnection implements RemotingConnection 
{
 
    @Override
    public List<CloseListener> removeCloseListeners() {
-      synchronized (closeListeners) {
-         List<CloseListener> deletedCloseListeners = new 
ArrayList<>(closeListeners);
-         closeListeners.clear();
-         return deletedCloseListeners;
-      }
+      List<CloseListener> deletedCloseListeners = copyCloseListeners();
+      closeListeners.clear();
+      return deletedCloseListeners;
    }
 
    @Override
    public void setCloseListeners(List<CloseListener> listeners) {
+      closeListeners.clear();
       closeListeners.addAll(listeners);
    }
 
@@ -119,19 +118,15 @@ public class MQTTConnection implements RemotingConnection 
{
 
    @Override
    public List<FailureListener> removeFailureListeners() {
-      synchronized (failureListeners) {
-         List<FailureListener> deletedFailureListeners = new 
ArrayList<>(failureListeners);
-         failureListeners.clear();
-         return deletedFailureListeners;
-      }
+      List<FailureListener> deletedFailureListeners = copyFailureListeners();
+      failureListeners.clear();
+      return deletedFailureListeners;
    }
 
    @Override
    public void setFailureListeners(List<FailureListener> listeners) {
-      synchronized (failureListeners) {
-         failureListeners.clear();
-         failureListeners.addAll(listeners);
-      }
+      failureListeners.clear();
+      failureListeners.addAll(listeners);
    }
 
    @Override
@@ -141,13 +136,20 @@ public class MQTTConnection implements RemotingConnection 
{
 
    @Override
    public void fail(ActiveMQException me) {
-      synchronized (failureListeners) {
-         for (FailureListener listener : failureListeners) {
-            listener.connectionFailed(me, false);
-         }
+      List<FailureListener> copy = copyFailureListeners();
+      for (FailureListener listener : copy) {
+         listener.connectionFailed(me, false);
       }
    }
 
+   private List<FailureListener> copyFailureListeners() {
+      return new ArrayList<>(failureListeners);
+   }
+
+   private List<CloseListener> copyCloseListeners() {
+      return new ArrayList<>(closeListeners);
+   }
+
    @Override
    public void fail(ActiveMQException me, String scaleDownTargetNodeID) {
       synchronized (failureListeners) {

Reply via email to