Repository: activemq Updated Branches: refs/heads/master e996dbe7c -> e47edd7a2
https://issues.apache.org/jira/browse/AMQ-6262 Ensure that the connection check task is stopped once commands pass through the inactivity monitor to prevent the transport from being closed for no reason. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/e47edd7a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/e47edd7a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/e47edd7a Branch: refs/heads/master Commit: e47edd7a282e1391f480c1278555f4a86e9a8ea9 Parents: e996dbe Author: Timothy Bish <[email protected]> Authored: Mon Apr 25 17:05:09 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Apr 25 17:05:09 2016 -0400 ---------------------------------------------------------------------- .../transport/http/HttpInactivityMonitor.java | 58 ++++++++++++ .../transport/http/HttpTransportFactory.java | 9 +- .../http/HttpTransportConnectTimeoutTest.java | 93 ++++++++++++++++++++ .../src/test/resources/log4j.properties | 1 + 4 files changed, 156 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/e47edd7a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpInactivityMonitor.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpInactivityMonitor.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpInactivityMonitor.java new file mode 100644 index 0000000..e6b61b0 --- /dev/null +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpInactivityMonitor.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.http; + +import java.io.IOException; + +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionInfo; +import org.apache.activemq.transport.InactivityMonitor; +import org.apache.activemq.transport.Transport; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Inactivity Monitor specialization for use with HTTP based transports. + */ +public class HttpInactivityMonitor extends InactivityMonitor { + + private static final Logger LOG = LoggerFactory.getLogger(HttpInactivityMonitor.class); + + /** + * @param next + * The next Transport in the filter chain. + */ + public HttpInactivityMonitor(Transport next) { + super(next, null); + } + + @Override + public void onCommand(Object command) { + if (command.getClass() == ConnectionInfo.class || command.getClass() == BrokerInfo.class) { + synchronized (this) { + try { + LOG.trace("Connection {} attempted on HTTP based transport: {}", command, this); + processInboundWireFormatInfo(null); + } catch (IOException e) { + onException(e); + } + } + } + + super.onCommand(command); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e47edd7a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java index 13b19c0..8332a95 100755 --- a/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java +++ b/activemq-http/src/main/java/org/apache/activemq/transport/http/HttpTransportFactory.java @@ -23,7 +23,6 @@ import java.net.URISyntaxException; import java.util.HashMap; import java.util.Map; -import org.apache.activemq.transport.InactivityMonitor; import org.apache.activemq.transport.Transport; import org.apache.activemq.transport.TransportFactory; import org.apache.activemq.transport.TransportLoggerFactory; @@ -60,7 +59,7 @@ public class HttpTransportFactory extends TransportFactory { if (wireFormat instanceof TextWireFormat) { return (TextWireFormat)wireFormat; } - LOG.trace("Not created with a TextWireFormat: " + wireFormat); + LOG.trace("Not created with a TextWireFormat: {}", wireFormat); return new XStreamWireFormat(); } @@ -94,8 +93,8 @@ public class HttpTransportFactory extends TransportFactory { @SuppressWarnings("rawtypes") public Transport compositeConfigure(Transport transport, WireFormat format, Map options) { transport = super.compositeConfigure(transport, format, options); - HttpClientTransport httpTransport = (HttpClientTransport)transport.narrow(HttpClientTransport.class); - if(httpTransport != null && httpTransport.isTrace() ) { + HttpClientTransport httpTransport = transport.narrow(HttpClientTransport.class); + if (httpTransport != null && httpTransport.isTrace()) { try { transport = TransportLoggerFactory.getInstance().createTransportLogger(transport); } catch (Throwable e) { @@ -104,7 +103,7 @@ public class HttpTransportFactory extends TransportFactory { } boolean useInactivityMonitor = "true".equals(getOption(options, "useInactivityMonitor", "true")); if (useInactivityMonitor) { - transport = new InactivityMonitor(transport, null /* ignore wire format as no negotiation over http */); + transport = new HttpInactivityMonitor(transport); IntrospectionSupport.setProperties(transport, options); } http://git-wip-us.apache.org/repos/asf/activemq/blob/e47edd7a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTransportConnectTimeoutTest.java ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTransportConnectTimeoutTest.java b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTransportConnectTimeoutTest.java new file mode 100644 index 0000000..a8f3d6d --- /dev/null +++ b/activemq-http/src/test/java/org/apache/activemq/transport/http/HttpTransportConnectTimeoutTest.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.http; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import javax.jms.Connection; +import javax.jms.ExceptionListener; +import javax.jms.JMSException; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Queue; +import javax.jms.Session; + +import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.broker.BrokerService; +import org.apache.activemq.broker.TransportConnector; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class HttpTransportConnectTimeoutTest { + + private static final Logger LOG = LoggerFactory.getLogger(HttpTransportConnectTimeoutTest.class); + + private BrokerService broker; + private ActiveMQConnectionFactory factory; + + @Before + public void setUp() throws Exception { + broker = new BrokerService(); + TransportConnector connector = broker.addConnector( + "http://localhost:0?trace=true&transport.connectAttemptTimeout=2000"); + broker.setPersistent(false); + broker.setUseJmx(false); + broker.start(); + + String connectionUri = connector.getPublishableConnectString(); + factory = new ActiveMQConnectionFactory(connectionUri + "?trace=true"); + } + + @After + public void tearDown() throws Exception { + broker.stop(); + } + + @Test(timeout = 60000) + public void testSendReceiveAfterPause() throws Exception { + final CountDownLatch failed = new CountDownLatch(1); + + Connection connection = factory.createConnection(); + connection.start(); + connection.setExceptionListener(new ExceptionListener() { + + @Override + public void onException(JMSException exception) { + LOG.info("Connection failed due to: {}", exception.getMessage()); + failed.countDown(); + } + }); + + assertFalse(failed.await(3, TimeUnit.SECONDS)); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createTemporaryQueue(); + MessageProducer producer = session.createProducer(queue); + MessageConsumer consumer = session.createConsumer(queue); + + producer.send(session.createMessage()); + + assertNotNull(consumer.receive(5000)); + } +} http://git-wip-us.apache.org/repos/asf/activemq/blob/e47edd7a/activemq-http/src/test/resources/log4j.properties ---------------------------------------------------------------------- diff --git a/activemq-http/src/test/resources/log4j.properties b/activemq-http/src/test/resources/log4j.properties index f28c2a8..aa64270 100755 --- a/activemq-http/src/test/resources/log4j.properties +++ b/activemq-http/src/test/resources/log4j.properties @@ -21,6 +21,7 @@ log4j.rootLogger=INFO, out, stdout log4j.logger.org.apache.activemq.transport.ws=DEBUG +log4j.logger.org.apache.activemq.transport.http=DEBUG #log4j.logger.org.apache.activemq.broker.scheduler=DEBUG #log4j.logger.org.apache.activemq.network.DemandForwardingBridgeSupport=DEBUG
