Author: kwall
Date: Thu Feb  9 17:34:24 2012
New Revision: 1242408

URL: http://svn.apache.org/viewvc?rev=1242408&view=rev
Log:
QPID-3823: ServerSession unblock(AMQQueue) can cause NPE when trying to remove 
a queue from _blockingQueues Map that is not present

Applied patch from Andrew MacBean <[email protected]> and Philip Harvey 
<[email protected]>

Modified:
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?rev=1242408&r1=1242407&r2=1242408&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
 Thu Feb  9 17:34:24 2012
@@ -20,8 +20,22 @@
  */
 package org.apache.qpid.server;
 
-import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQMethodBody;
@@ -79,21 +93,6 @@ import org.apache.qpid.server.txn.Server
 import org.apache.qpid.server.virtualhost.VirtualHost;
 import org.apache.qpid.transport.TransportException;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class AMQChannel implements SessionConfig, AMQSessionModel, 
AsyncAutoCommitTransaction.FutureRecorder
 {
     public static final int DEFAULT_PREFETCH = 4096;
@@ -155,7 +154,7 @@ public class AMQChannel implements Sessi
     private final AMQProtocolSession _session;
     private AtomicBoolean _closing = new AtomicBoolean(false);
 
-    private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new 
ConcurrentHashMap<AMQQueue, Boolean>();
+    private final Set<AMQQueue> _blockingQueues = new 
ConcurrentSkipListSet<AMQQueue>();
 
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
 
@@ -1363,7 +1362,7 @@ public class AMQChannel implements Sessi
 
     public void block(AMQQueue queue)
     {
-        if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+        if(_blockingQueues.add(queue))
         {
 
             if(_blocking.compareAndSet(false,true))
@@ -1616,4 +1615,9 @@ public class AMQChannel implements Sessi
         }
     }
 
+    @Override
+    public int compareTo(AMQSessionModel session)
+    {
+        return getId().toString().compareTo(session.getID().toString());
+    }
 }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java?rev=1242408&r1=1242407&r2=1242408&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
 Thu Feb  9 17:34:24 2012
@@ -20,11 +20,19 @@
  */
 package org.apache.qpid.server.protocol;
 
+import java.util.concurrent.ConcurrentSkipListSet;
+
 import org.apache.qpid.AMQException;
 import org.apache.qpid.server.logging.LogSubject;
 import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.SimpleAMQQueue;
 
-public interface AMQSessionModel
+/**
+ * Session model interface.
+ * Extends {@link Comparable} to allow objects to be inserted into a {@link 
ConcurrentSkipListSet}
+ * when monitoring the blocking and blocking of queues/sessions in {@link 
SimpleAMQQueue}.
+ */
+public interface AMQSessionModel extends Comparable<AMQSessionModel>
 {
     public Object getID();
 

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java?rev=1242408&r1=1242407&r2=1242408&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
 Thu Feb  9 17:34:24 2012
@@ -18,8 +18,23 @@
  */
 package org.apache.qpid.server.queue;
 
-import org.apache.log4j.Logger;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.management.JMException;
 
+import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQSecurityException;
 import org.apache.qpid.framing.AMQShortString;
@@ -52,22 +67,6 @@ import org.apache.qpid.server.txn.LocalT
 import org.apache.qpid.server.txn.ServerTransaction;
 import org.apache.qpid.server.virtualhost.VirtualHost;
 
-import javax.management.JMException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.EnumSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.Executor;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
-
 public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, 
MessageGroupManager.SubscriptionResetHelper
 {
     private static final Logger _logger = 
Logger.getLogger(SimpleAMQQueue.class);
@@ -165,7 +164,7 @@ public class SimpleAMQQueue implements A
     private AtomicInteger _deliveredMessages = new AtomicInteger();
     private AtomicBoolean _stopped = new AtomicBoolean(false);
 
-    private final ConcurrentMap<AMQSessionModel, Boolean> _blockedChannels = 
new ConcurrentHashMap<AMQSessionModel, Boolean>();
+    private final Set<AMQSessionModel> _blockedChannels = new 
ConcurrentSkipListSet<AMQSessionModel>();
 
     private final AtomicBoolean _deleted = new AtomicBoolean(false);
     private final List<Task> _deleteTaskList = new 
CopyOnWriteArrayList<Task>();
@@ -1629,7 +1628,7 @@ public class SimpleAMQQueue implements A
                 //Overfull log message
                 _logActor.message(_logSubject, 
QueueMessages.OVERFULL(_atomicQueueSize.get(), _capacity));
 
-                _blockedChannels.putIfAbsent(channel, Boolean.TRUE);
+                _blockedChannels.add(channel);
 
                 channel.block(this);
 
@@ -1662,11 +1661,10 @@ public class SimpleAMQQueue implements A
                     _logActor.message(_logSubject, 
QueueMessages.UNDERFULL(_atomicQueueSize.get(), _flowResumeCapacity));
                 }
 
-
-                for(AMQSessionModel c : _blockedChannels.keySet())
+                for(final AMQSessionModel blockedChannel : _blockedChannels)
                 {
-                    c.unblock(this);
-                    _blockedChannels.remove(c);
+                    blockedChannel.unblock(this);
+                    _blockedChannels.remove(blockedChannel);
                 }
             }
         }

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java?rev=1242408&r1=1242407&r2=1242408&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 (original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
 Thu Feb  9 17:34:24 2012
@@ -20,8 +20,28 @@
  */
 package org.apache.qpid.server.transport;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static 
org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
+import static org.apache.qpid.util.Serial.gt;
+
+import java.security.Principal;
+import java.text.MessageFormat;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.ConcurrentSkipListSet;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.security.auth.Subject;
 
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
@@ -65,27 +85,8 @@ import org.apache.qpid.transport.RangeSe
 import org.apache.qpid.transport.RangeSetFactory;
 import org.apache.qpid.transport.Session;
 import org.apache.qpid.transport.SessionDelegate;
-
-import static 
org.apache.qpid.server.logging.subjects.LogSubjectFormat.CHANNEL_FORMAT;
-import static org.apache.qpid.util.Serial.gt;
-
-import javax.security.auth.Subject;
-import java.security.Principal;
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.SortedMap;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class ServerSession extends Session 
         implements AuthorizationHolder, SessionConfig, 
@@ -102,7 +103,7 @@ public class ServerSession extends Sessi
     private long _createTime = System.currentTimeMillis();
     private LogActor _actor = GenericActor.getInstance(this);
 
-    private final ConcurrentMap<AMQQueue, Boolean> _blockingQueues = new 
ConcurrentHashMap<AMQQueue, Boolean>();
+    private final Set<AMQQueue> _blockingQueues = new 
ConcurrentSkipListSet<AMQQueue>();
 
     private final AtomicBoolean _blocking = new AtomicBoolean(false);
     private ChannelLogSubject _logSubject;
@@ -684,7 +685,8 @@ public class ServerSession extends Sessi
 
     public void block(AMQQueue queue)
     {
-        if(_blockingQueues.putIfAbsent(queue, Boolean.TRUE) == null)
+
+        if(_blockingQueues.add(queue))
         {
 
             if(_blocking.compareAndSet(false,true))
@@ -897,4 +899,10 @@ public class ServerSession extends Sessi
             return _future.isComplete();
         }
     }
+
+    @Override
+    public int compareTo(AMQSessionModel session)
+    {
+        return getId().toString().compareTo(session.getID().toString());
+    }
 }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:[email protected]

Reply via email to