Repository: activemq Updated Branches: refs/heads/trunk 97c127d2d -> 62c20ebdc
Apply fix and add test for: https://issues.apache.org/jira/browse/AMQ-5385 Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/62c20ebd Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/62c20ebd Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/62c20ebd Branch: refs/heads/trunk Commit: 62c20ebdcfcf4307a4379b39d8352891f8824d56 Parents: 97c127d Author: Timothy Bish <[email protected]> Authored: Thu Oct 9 12:40:13 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Thu Oct 9 12:40:13 2014 -0400 ---------------------------------------------------------------------- .../activemq/broker/region/RegionBroker.java | 30 +++++++------- .../activemq/transport/mqtt/MQTTTest.java | 41 ++++++++++++++++++++ 2 files changed, 56 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/62c20ebd/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java ---------------------------------------------------------------------- diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java index fb7d69e..4ebcef5 100755 --- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java +++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/RegionBroker.java @@ -245,23 +245,23 @@ public class RegionBroker extends EmptyBroker { synchronized (clientIdSet) { ConnectionContext oldContext = clientIdSet.get(clientId); if (oldContext != null) { - if (context.isAllowLinkStealing()){ - clientIdSet.remove(clientId); - if (oldContext.getConnection() != null) { - Connection connection = oldContext.getConnection(); - LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection()); - if (connection instanceof TransportConnection){ + if (context.isAllowLinkStealing()) { + clientIdSet.put(clientId, context); + if (oldContext.getConnection() != null) { + Connection connection = oldContext.getConnection(); + LOG.warn("Stealing link for clientId {} From Connection {}", clientId, oldContext.getConnection()); + if (connection instanceof TransportConnection) { TransportConnection transportConnection = (TransportConnection) connection; - transportConnection.stopAsync(); - }else{ - connection.stop(); - } - }else{ - LOG.error("Not Connection for {}", oldContext); - } - }else{ + transportConnection.stopAsync(); + } else { + connection.stop(); + } + } else { + LOG.error("No Connection found for {}", oldContext); + } + } else { throw new InvalidClientIDException("Broker: " + getBrokerName() + " - Client: " + clientId + " already connected from " - + oldContext.getConnection().getRemoteAddress()); + + oldContext.getConnection().getRemoteAddress()); } } else { clientIdSet.put(clientId, context); http://git-wip-us.apache.org/repos/asf/activemq/blob/62c20ebd/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java ---------------------------------------------------------------------- diff --git a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java index 1586ff4..32f8167 100644 --- a/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java +++ b/activemq-mqtt/src/test/java/org/apache/activemq/transport/mqtt/MQTTTest.java @@ -34,6 +34,7 @@ import java.util.Random; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; import java.util.regex.Pattern; import javax.jms.BytesMessage; @@ -1227,6 +1228,46 @@ public class MQTTTest extends MQTTTestSupport { connection2.disconnect(); } + @Test(timeout = 60 * 1000) + public void testRepeatedLinkStealing() throws Exception { + final String clientId = "duplicateClient"; + final AtomicReference<BlockingConnection> oldConnection = new AtomicReference<BlockingConnection>(); + final String TOPICA = "TopicA"; + + for (int i = 1; i <= 10; ++i) { + + LOG.info("Creating MQTT Connection {}", i); + + MQTT mqtt = createMQTTConnection(clientId, false); + mqtt.setKeepAlive((short) 2); + final BlockingConnection connection = mqtt.blockingConnection(); + connection.connect(); + connection.publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); + + assertTrue("Client connect failed for attempt: " + i, Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return connection.isConnected(); + } + })); + + if (oldConnection.get() != null) { + + assertTrue("Old client still connected", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisified() throws Exception { + return !oldConnection.get().isConnected(); + } + })); + } + + oldConnection.set(connection); + } + + oldConnection.get().publish(TOPICA, TOPICA.getBytes(), QoS.EXACTLY_ONCE, true); + oldConnection.get().disconnect(); + } + @Test(timeout = 30 * 10000) public void testJmsMapping() throws Exception { doTestJmsMapping("test.foo");
