Repository: activemq Updated Branches: refs/heads/trunk 106f75957 -> 5aef16ad0
https://issues.apache.org/jira/browse/AMQ-5256 - fix spurious amqp ioexception on concurrent connection protocol discrimination, client would see a hang on open. additional test Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/5aef16ad Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/5aef16ad Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/5aef16ad Branch: refs/heads/trunk Commit: 5aef16ad0600f525f4515e55f296a987bffa8058 Parents: 106f759 Author: gtully <gary.tu...@gmail.com> Authored: Tue Jul 1 22:33:42 2014 +0100 Committer: gtully <gary.tu...@gmail.com> Committed: Tue Jul 1 22:34:44 2014 +0100 ---------------------------------------------------------------------- .../amqp/AMQPProtocolDiscriminator.java | 2 +- .../transport/amqp/bugs/AMQ5256Test.java | 68 ++++++++++++++++++++ 2 files changed, 69 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/5aef16ad/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java index bf0c655..58da746 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AMQPProtocolDiscriminator.java @@ -59,7 +59,7 @@ public class AMQPProtocolDiscriminator implements IAmqpProtocolConverter { } - static final private ArrayList<Command> pendingCommands = new ArrayList<Command>(); + final private ArrayList<Command> pendingCommands = new ArrayList<Command>(); public AMQPProtocolDiscriminator(AmqpTransport transport) { this.transport = transport; http://git-wip-us.apache.org/repos/asf/activemq/blob/5aef16ad/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java new file mode 100644 index 0000000..85acb64 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/bugs/AMQ5256Test.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.bugs; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import javax.jms.Connection; +import javax.jms.JMSException; +import org.apache.activemq.transport.amqp.AmqpTestSupport; +import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl; +import org.junit.Test; + + +import static org.junit.Assert.assertTrue; + +public class AMQ5256Test extends AmqpTestSupport { + + @Override + protected boolean isUseTcpConnector() { + return true; + } + + @Override + protected boolean isUseNioPlusSslConnector() { + return false; + } + + @Test(timeout = 40 * 1000) + public void testParallelConnect() throws Exception { + final int numThreads = 80; + ExecutorService executorService = Executors.newFixedThreadPool(numThreads); + for (int i = 0; i < numThreads; i++) { + executorService.execute(new Runnable() { + @Override + public void run() { + + try { + final ConnectionFactoryImpl connectionFactory = new ConnectionFactoryImpl("localhost", port, "admin", "password", null, isUseSslConnector()); + Connection connection = connectionFactory.createConnection(); + connection.start(); + connection.close(); + } catch (JMSException e) { + e.printStackTrace(); + } + } + }); + } + + executorService.shutdown(); + assertTrue("executor done on time", executorService.awaitTermination(30, TimeUnit.SECONDS)); + + } +}