Author: kwall
Date: Mon Nov 16 18:09:21 2015
New Revision: 1714647

URL: http://svn.apache.org/viewvc?rev=1714647&view=rev
Log:
QPID-6861: Avoid unsafe read of AMQPConnection_0_8#_channelMap

Modified:
    
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java

Modified: 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL: 
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1714647&r1=1714646&r2=1714647&view=diff
==============================================================================
--- 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
 (original)
+++ 
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
 Mon Nov 16 18:09:21 2015
@@ -32,7 +32,6 @@ import java.security.Principal;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
@@ -108,9 +107,6 @@ public class AMQPConnection_0_8
 
     private static final Logger _logger = 
LoggerFactory.getLogger(AMQPConnection_0_8.class);
 
-    // to save boxing the channelId and looking up in a map... cache in an 
array the low numbered
-    // channels.  This value must be of the form 2^x - 1.
-    private static final int CHANNEL_CACHE_SIZE = 0xff;
     private static final String BROKER_DEBUG_BINARY_DATA_LENGTH = 
"broker.debug.binaryDataLength";
     private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
 
@@ -121,10 +117,8 @@ public class AMQPConnection_0_8
 
     private volatile VirtualHostImpl<?,?,?> _virtualHost;
 
-    private final Map<Integer, AMQChannel> _channelMap =
-            new HashMap<>();
-
-    private final AMQChannel[] _cachedChannels = new 
AMQChannel[CHANNEL_CACHE_SIZE + 1];
+    private final Object _channelAddRemoveLock = new Object();
+    private final Map<Integer, AMQChannel> _channelMap = new 
ConcurrentHashMap<>();
 
     private ConnectionState _state = ConnectionState.INIT;
 
@@ -159,6 +153,7 @@ public class AMQPConnection_0_8
     private final ByteBufferSender _sender;
 
     private volatile boolean _deferFlush;
+    /** Guarded by _channelAddRemoveLock */
     private boolean _blocking;
 
     private volatile boolean _closeWhenNoRoute;
@@ -416,18 +411,9 @@ public class AMQPConnection_0_8
         }
     }
 
-    private List<AMQChannel> getChannels()
-    {
-        synchronized (_channelMap)
-        {
-            return new ArrayList<>(_channelMap.values());
-        }
-    }
-
     public AMQChannel getChannel(int channelId)
     {
-        final AMQChannel channel =
-                ((channelId & CHANNEL_CACHE_SIZE) == channelId) ? 
_cachedChannels[channelId] : _channelMap.get(channelId);
+        final AMQChannel channel = _channelMap.get(channelId);
         if ((channel == null) || channel.isClosing())
         {
             return null;
@@ -445,9 +431,7 @@ public class AMQPConnection_0_8
 
     private void addChannel(AMQChannel channel)
     {
-        final int channelId = channel.getChannelId();
-
-        synchronized (_channelMap)
+        synchronized (_channelAddRemoveLock)
         {
             _channelMap.put(channel.getChannelId(), channel);
             sessionAdded(channel);
@@ -456,11 +440,16 @@ public class AMQPConnection_0_8
                 channel.block();
             }
         }
+    }
 
-        if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
+    private void removeChannel(int channelId)
+    {
+        AMQChannel session;
+        synchronized (_channelAddRemoveLock)
         {
-            _cachedChannels[channelId] = channel;
+            session = _channelMap.remove(channelId);
         }
+        sessionRemoved(session);
     }
 
     public long getMaximumNumberOfChannels()
@@ -527,25 +516,6 @@ public class AMQPConnection_0_8
         _closingChannelsList.put(channelId, System.currentTimeMillis());
     }
 
-    /**
-     * In our current implementation this is used by the clustering code.
-     *
-     * @param channelId The channel to remove
-     */
-    public void removeChannel(int channelId)
-    {
-        AMQChannel session;
-        synchronized (_channelMap)
-        {
-            session = _channelMap.remove(channelId);
-            if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
-            {
-                _cachedChannels[channelId] = null;
-            }
-        }
-        sessionRemoved(session);
-    }
-
     private void initHeartbeats(int delay)
     {
         if (delay > 0)
@@ -565,7 +535,7 @@ public class AMQPConnection_0_8
         try
         {
             RuntimeException firstException = null;
-            for (AMQChannel channel : getChannels())
+            for (AMQChannel channel : getSessionModels())
             {
                 try
                 {
@@ -588,15 +558,10 @@ public class AMQPConnection_0_8
         }
         finally
         {
-            synchronized (_channelMap)
+            synchronized (_channelAddRemoveLock)
             {
                 _channelMap.clear();
             }
-            for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
-            {
-                _cachedChannels[i] = null;
-            }
-
         }
     }
 
@@ -944,7 +909,7 @@ public class AMQPConnection_0_8
 
     public void block()
     {
-        synchronized (_channelMap)
+        synchronized (_channelAddRemoveLock)
         {
             if(!_blocking)
             {
@@ -959,7 +924,7 @@ public class AMQPConnection_0_8
 
     public void unblock()
     {
-        synchronized (_channelMap)
+        synchronized (_channelAddRemoveLock)
         {
             if(_blocking)
             {
@@ -975,7 +940,7 @@ public class AMQPConnection_0_8
     @Override
     public List<AMQChannel> getSessionModels()
     {
-               return new ArrayList<>(getChannels());
+        return new ArrayList<>(_channelMap.values());
     }
 
     @Override



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to