Repository: qpid-jms Updated Branches: refs/heads/master cc4921293 -> 8c9456bfd
QPIDJMS-45: wire up a config option for the local idle timeout, add tests to verify it has an effect and a default exists Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/8c9456bf Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/8c9456bf Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/8c9456bf Branch: refs/heads/master Commit: 8c9456bfd404945e0413a6c689d6d0df88d18be8 Parents: cc49212 Author: Robert Gemmell <[email protected]> Authored: Thu May 7 18:03:34 2015 +0100 Committer: Robert Gemmell <[email protected]> Committed: Thu May 7 18:03:34 2015 +0100 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 20 ++++- .../integration/IdleTimeoutIntegrationTest.java | 80 ++++++++++++++++++++ .../provider/amqp/AmqpProviderFactoryTest.java | 21 +++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 16 +++- 4 files changed, 133 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8c9456bf/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index b02c148..ff560d1 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -110,6 +110,7 @@ public class AmqpProvider implements Provider, TransportListener { private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT; private int channelMax = DEFAULT_CHANNEL_MAX; + private int idleTimeout = 60000; private final URI remoteURI; private final AtomicBoolean closed = new AtomicBoolean(); @@ -261,8 +262,7 @@ public class AmqpProvider implements Provider, TransportListener { Connection protonConnection = Connection.Factory.create(); protonTransport.setMaxFrameSize(getMaxFrameSize()); protonTransport.setChannelMax(getChannelMax()); - //TODO: wire up idle-timeout config, decide on a default. - protonTransport.setIdleTimeout(60000); + protonTransport.setIdleTimeout(idleTimeout); protonTransport.bind(protonConnection); protonConnection.collect(protonCollector); Sasl sasl = protonTransport.sasl(); @@ -872,6 +872,22 @@ public class AmqpProvider implements Provider, TransportListener { return this.traceBytes; } + public int getIdleTimeout() { + return idleTimeout; + } + + /** + * Sets the idle timeout (in milliseconds) after which the connection will + * be closed if the peer has not send any data. The provided value will be + * halved before being transmitted as our advertised idle-timeout in the + * AMQP Open frame. + * + * @param idleTimeout the timeout in milliseconds. + */ + public void setIdleTimeout(int idleTimeout) { + this.idleTimeout = idleTimeout; + } + public long getCloseTimeout() { return this.closeTimeout; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8c9456bf/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java new file mode 100644 index 0000000..984b01c --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/IdleTimeoutIntegrationTest.java @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.jms.integration; + +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.greaterThan; +import static org.junit.Assert.assertNull; + +import javax.jms.Connection; +import javax.jms.ConnectionFactory; + +import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.junit.Test; + +public class IdleTimeoutIntegrationTest extends QpidJmsTestCase { + + @Test(timeout = 5000) + public void testIdleTimeoutIsAdvertisedByDefault() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.expectAnonymousConnect(true, greaterThan(UnsignedInteger.valueOf(0))); + // Each connection creates a session for managing temporary destinations etc + testPeer.expectBegin(true); + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort()); + Connection connection = factory.createConnection(); + // Set a clientID to provoke the actual AMQP connection process to occur. + connection.setClientID("clientName"); + + testPeer.waitForAllHandlersToComplete(1000); + assertNull(testPeer.getThrowable()); + + testPeer.expectClose(); + connection.close(); + } + } + + @Test(timeout = 5000) + public void testAdvertisedIdleTimeoutIsHalfOfActualTimeoutValue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + int configuredTimeout = 54320; + int advertisedValue = configuredTimeout / 2; + + testPeer.expectAnonymousConnect(true, equalTo(UnsignedInteger.valueOf(advertisedValue))); + // Each connection creates a session for managing temporary destinations etc + testPeer.expectBegin(true); + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?amqp.IdleTimeout=" + configuredTimeout); + Connection connection = factory.createConnection(); + // Set a clientID to provoke the actual AMQP connection process to occur. + connection.setClientID("clientName"); + + testPeer.waitForAllHandlersToComplete(1000); + assertNull(testPeer.getThrowable()); + + testPeer.expectClose(); + connection.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8c9456bf/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactoryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactoryTest.java index c0d9fd3..8ec53fd 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactoryTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderFactoryTest.java @@ -74,6 +74,27 @@ public class AmqpProviderFactoryTest extends QpidJmsTestCase { } @Test(timeout = 10000) + public void testCreateProviderHasDefaultIdleTimeoutValue() throws IOException, Exception { + Provider provider = AmqpProviderFactory.create(new URI(peerURI.toString())); + assertNotNull(provider); + assertTrue(provider instanceof AmqpProvider); + AmqpProvider amqpProvider = (AmqpProvider) provider; + + assertTrue("No default idle timeout", amqpProvider.getIdleTimeout() > 0); + } + + @Test(timeout = 10000) + public void testCreateProviderAppliesIdleTimeoutURIOption() throws IOException, Exception { + int timeout = 54321; + Provider provider = AmqpProviderFactory.create(new URI(peerURI.toString() + "?amqp.idleTimeout=" + timeout)); + assertNotNull(provider); + assertTrue(provider instanceof AmqpProvider); + AmqpProvider amqpProvider = (AmqpProvider) provider; + + assertEquals("idle timeout option was not applied", timeout, amqpProvider.getIdleTimeout()); + } + + @Test(timeout = 10000) public void testCreateProviderAppliesOptions() throws IOException, Exception { URI configuredURI = new URI(peerURI.toString() + "?amqp.presettleConsumers=true" + http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/8c9456bf/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 6869b8e..3dd6506 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -322,6 +322,11 @@ public class TestAmqpPeer implements AutoCloseable public void expectAnonymousConnect(boolean authorize) { + expectAnonymousConnect(authorize, null); + } + + public void expectAnonymousConnect(boolean authorize, Matcher<?> idleTimeoutMatcher) + { SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(Symbol.valueOf("ANONYMOUS")); addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, new FrameSender( @@ -347,12 +352,19 @@ public class TestAmqpPeer implements AutoCloseable addHandler(new HeaderHandlerImpl(AmqpHeader.HEADER, AmqpHeader.HEADER)); - addHandler(new OpenMatcher() + OpenMatcher openMatcher = new OpenMatcher() .withContainerId(notNullValue(String.class)) .onSuccess(new FrameSender( this, FrameType.AMQP, 0, new OpenFrame().setContainerId("test-amqp-peer-container-id"), - null))); + null)); + + if(idleTimeoutMatcher !=null) + { + openMatcher.withIdleTimeOut(idleTimeoutMatcher); + } + + addHandler(openMatcher); } /** --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
