Author: kwall
Date: Mon Nov 16 18:09:21 2015
New Revision: 1714647
URL: http://svn.apache.org/viewvc?rev=1714647&view=rev
Log:
QPID-6861: Avoid unsafe read of AMQPConnection_0_8#_channelMap
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Modified:
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
URL:
http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java?rev=1714647&r1=1714646&r2=1714647&view=diff
==============================================================================
---
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
(original)
+++
qpid/java/trunk/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQPConnection_0_8.java
Mon Nov 16 18:09:21 2015
@@ -32,7 +32,6 @@ import java.security.Principal;
import java.security.PrivilegedAction;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -108,9 +107,6 @@ public class AMQPConnection_0_8
private static final Logger _logger =
LoggerFactory.getLogger(AMQPConnection_0_8.class);
- // to save boxing the channelId and looking up in a map... cache in an
array the low numbered
- // channels. This value must be of the form 2^x - 1.
- private static final int CHANNEL_CACHE_SIZE = 0xff;
private static final String BROKER_DEBUG_BINARY_DATA_LENGTH =
"broker.debug.binaryDataLength";
private static final int DEFAULT_DEBUG_BINARY_DATA_LENGTH = 80;
@@ -121,10 +117,8 @@ public class AMQPConnection_0_8
private volatile VirtualHostImpl<?,?,?> _virtualHost;
- private final Map<Integer, AMQChannel> _channelMap =
- new HashMap<>();
-
- private final AMQChannel[] _cachedChannels = new
AMQChannel[CHANNEL_CACHE_SIZE + 1];
+ private final Object _channelAddRemoveLock = new Object();
+ private final Map<Integer, AMQChannel> _channelMap = new
ConcurrentHashMap<>();
private ConnectionState _state = ConnectionState.INIT;
@@ -159,6 +153,7 @@ public class AMQPConnection_0_8
private final ByteBufferSender _sender;
private volatile boolean _deferFlush;
+ /** Guarded by _channelAddRemoveLock */
private boolean _blocking;
private volatile boolean _closeWhenNoRoute;
@@ -416,18 +411,9 @@ public class AMQPConnection_0_8
}
}
- private List<AMQChannel> getChannels()
- {
- synchronized (_channelMap)
- {
- return new ArrayList<>(_channelMap.values());
- }
- }
-
public AMQChannel getChannel(int channelId)
{
- final AMQChannel channel =
- ((channelId & CHANNEL_CACHE_SIZE) == channelId) ?
_cachedChannels[channelId] : _channelMap.get(channelId);
+ final AMQChannel channel = _channelMap.get(channelId);
if ((channel == null) || channel.isClosing())
{
return null;
@@ -445,9 +431,7 @@ public class AMQPConnection_0_8
private void addChannel(AMQChannel channel)
{
- final int channelId = channel.getChannelId();
-
- synchronized (_channelMap)
+ synchronized (_channelAddRemoveLock)
{
_channelMap.put(channel.getChannelId(), channel);
sessionAdded(channel);
@@ -456,11 +440,16 @@ public class AMQPConnection_0_8
channel.block();
}
}
+ }
- if (((channelId & CHANNEL_CACHE_SIZE) == channelId))
+ private void removeChannel(int channelId)
+ {
+ AMQChannel session;
+ synchronized (_channelAddRemoveLock)
{
- _cachedChannels[channelId] = channel;
+ session = _channelMap.remove(channelId);
}
+ sessionRemoved(session);
}
public long getMaximumNumberOfChannels()
@@ -527,25 +516,6 @@ public class AMQPConnection_0_8
_closingChannelsList.put(channelId, System.currentTimeMillis());
}
- /**
- * In our current implementation this is used by the clustering code.
- *
- * @param channelId The channel to remove
- */
- public void removeChannel(int channelId)
- {
- AMQChannel session;
- synchronized (_channelMap)
- {
- session = _channelMap.remove(channelId);
- if ((channelId & CHANNEL_CACHE_SIZE) == channelId)
- {
- _cachedChannels[channelId] = null;
- }
- }
- sessionRemoved(session);
- }
-
private void initHeartbeats(int delay)
{
if (delay > 0)
@@ -565,7 +535,7 @@ public class AMQPConnection_0_8
try
{
RuntimeException firstException = null;
- for (AMQChannel channel : getChannels())
+ for (AMQChannel channel : getSessionModels())
{
try
{
@@ -588,15 +558,10 @@ public class AMQPConnection_0_8
}
finally
{
- synchronized (_channelMap)
+ synchronized (_channelAddRemoveLock)
{
_channelMap.clear();
}
- for (int i = 0; i <= CHANNEL_CACHE_SIZE; i++)
- {
- _cachedChannels[i] = null;
- }
-
}
}
@@ -944,7 +909,7 @@ public class AMQPConnection_0_8
public void block()
{
- synchronized (_channelMap)
+ synchronized (_channelAddRemoveLock)
{
if(!_blocking)
{
@@ -959,7 +924,7 @@ public class AMQPConnection_0_8
public void unblock()
{
- synchronized (_channelMap)
+ synchronized (_channelAddRemoveLock)
{
if(_blocking)
{
@@ -975,7 +940,7 @@ public class AMQPConnection_0_8
@Override
public List<AMQChannel> getSessionModels()
{
- return new ArrayList<>(getChannels());
+ return new ArrayList<>(_channelMap.values());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]