Repository: activemq-artemis Updated Branches: refs/heads/master 29062935a -> 9d680cc0b
ARTEMIS-2140 queue creation race w/AMQP shared subs Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/e81453e6 Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/e81453e6 Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/e81453e6 Branch: refs/heads/master Commit: e81453e6608ce70436576f1ed54b1f62d30ddc2e Parents: 2906293 Author: Justin Bertram <jbert...@apache.org> Authored: Fri Nov 9 11:38:28 2018 -0600 Committer: Clebert Suconic <clebertsuco...@apache.org> Committed: Wed Nov 14 10:49:15 2018 -0500 ---------------------------------------------------------------------- .../amqp/broker/AMQPSessionCallback.java | 2 ++ .../amqp/JMSMessageConsumerTest.java | 31 ++++++++++++++++++++ 2 files changed, 33 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81453e6/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 14c1042..1ca4410 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -284,6 +284,8 @@ public class AMQPSessionCallback implements SessionCallback { serverSession.createQueue(address, queueName, routingType, filter, false, false, -1, true, true); } catch (ActiveMQSecurityException se) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.securityErrorCreatingConsumer(se.getMessage()); + } catch (ActiveMQQueueExistsException e) { + // ignore as may be caused by multiple, concurrent clients } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/e81453e6/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java index 0d8fdde..485d886 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/JMSMessageConsumerTest.java @@ -21,7 +21,10 @@ import java.util.Enumeration; import java.util.Random; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.BytesMessage; import javax.jms.Connection; @@ -35,6 +38,7 @@ import javax.jms.MessageProducer; import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TextMessage; +import javax.jms.Topic; import org.apache.activemq.artemis.core.server.Queue; import org.apache.activemq.artemis.tests.util.Wait; @@ -836,4 +840,31 @@ public class JMSMessageConsumerTest extends JMSClientTestSupport { connection.close(); } } + + @Test + public void testConcurrentSharedConsumerConnections() throws Exception { + final int concurrentConnections = 20; + final ExecutorService executorService = Executors.newFixedThreadPool(concurrentConnections); + + final AtomicBoolean failedToSubscribe = new AtomicBoolean(false); + for (int i = 1; i < concurrentConnections; i++) { + executorService.submit(() -> { + try (Connection connection = createConnection()) { + connection.start(); + @SuppressWarnings("resource") + final Session session = connection.createSession(); + final Topic topic = session.createTopic("topics.foo"); + session.createSharedConsumer(topic, "MY_SUB"); + Thread.sleep(100); + } catch (final Exception ex) { + ex.printStackTrace(); + failedToSubscribe.set(true); + } + }); + } + executorService.shutdown(); + executorService.awaitTermination(30, TimeUnit.SECONDS); + + assertFalse(failedToSubscribe.get()); + } }