Repository: qpid-jms Updated Branches: refs/heads/master baf15b096 -> 30e54e086
https://issues.apache.org/jira/browse/QPIDJMS-137 Allow a request timeout to be set that will fail an attempted resource create when a response is not sent in time. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/30e54e08 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/30e54e08 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/30e54e08 Branch: refs/heads/master Commit: 30e54e086d226696d930fb8cd00ccd9feae14702 Parents: baf15b0 Author: Timothy Bish <[email protected]> Authored: Wed Dec 2 18:12:51 2015 -0500 Committer: Timothy Bish <[email protected]> Committed: Wed Dec 2 18:12:51 2015 -0500 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 72 ++++++++---- .../jms/provider/amqp/AmqpResourceParent.java | 5 + .../qpid/jms/provider/amqp/AmqpSession.java | 1 + .../provider/amqp/AmqpTransactionContext.java | 7 +- .../amqp/builders/AmqpConnectionBuilder.java | 19 ++-- .../amqp/builders/AmqpResourceBuilder.java | 49 +++++++- .../AmqpTemporaryDestinationBuilder.java | 12 +- .../integration/ConnectionIntegrationTest.java | 25 +++++ .../jms/integration/SessionIntegrationTest.java | 112 ++++++++++++++++++- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 109 +++++++++--------- 10 files changed, 313 insertions(+), 98 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 415db33..30889d3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -26,6 +26,7 @@ import java.net.URI; import java.nio.ByteBuffer; import java.security.Principal; import java.util.concurrent.Executors; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; @@ -116,8 +117,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP private boolean presettleProducers; private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT; - private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; - private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT; private int channelMax = DEFAULT_CHANNEL_MAX; private int idleTimeout = 60000; private long sessionOutoingWindow = -1; //Use proton default @@ -275,8 +274,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP public void processConnectionInfo(JmsConnectionInfo connectionInfo) throws Exception { closeTimeout = connectionInfo.getCloseTimeout(); connectTimeout = connectionInfo.getConnectTimeout(); - sendTimeout = connectionInfo.getSendTimeout(); - requestTimeout = connectionInfo.getRequestTimeout(); if (getMaxFrameSize() > 0) { protonTransport.setMaxFrameSize(getMaxFrameSize()); @@ -916,12 +913,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } } - private void checkClosed() throws ProviderClosedException { - if (closed.get()) { - throw new ProviderClosedException("This Provider is already closed"); - } - } - @Override public void addChildResource(AmqpResource resource) { if (resource instanceof AmqpConnection) { @@ -1058,11 +1049,11 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } public long getRequestTimeout() { - return requestTimeout; + return connection != null ? connection.getResourceInfo().getRequestTimeout() : JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT; } public long getSendTimeout() { - return sendTimeout; + return connection != null ? connection.getResourceInfo().getSendTimeout() : JmsConnectionInfo.DEFAULT_SEND_TIMEOUT; } public void setPresettle(boolean presettle) { @@ -1134,6 +1125,55 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP return this.serializer; } + @Override + public AmqpProvider getProvider() { + return this; + } + + /** + * Allows a resource to request that its parent resource schedule a future + * cancellation of a request and return it a {@link Future} instance that + * can be used to cancel the scheduled automatic failure of the request. + * + * @param request + * The request that should be marked as failed based on configuration. + * @param error + * The error to use when failing the pending request. + * + * @return a {@link ScheduledFuture} that can be stored by the caller. + */ + public ScheduledFuture<?> scheduleRequestTimeout(final AsyncResult request, final Exception error) { + if (getRequestTimeout() != JmsConnectionInfo.INFINITE) { + return serializer.schedule(new Runnable() { + + @Override + public void run() { + request.onFailure(error); + pumpToProtonTransport(); + } + + }, getRequestTimeout(), TimeUnit.MILLISECONDS); + } + + return null; + } + + Principal getLocalPrincipal() { + if (transport instanceof SSLTransport) { + return ((SSLTransport) transport).getLocalPrincipal(); + } + + return null; + } + + //----- Internal implementation ------------------------------------------// + + private void checkClosed() throws ProviderClosedException { + if (closed.get()) { + throw new ProviderClosedException("This Provider is already closed"); + } + } + private final class IdleTimeoutCheck implements Runnable { @Override public void run() { @@ -1170,14 +1210,6 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } } - Principal getLocalPrincipal() { - if (transport instanceof SSLTransport) { - return ((SSLTransport) transport).getLocalPrincipal(); - } - - return null; - } - private static void setHostname(Sasl sasl, String hostname) { // TODO: this is a hack until Proton 0.10+ is available with sasl#setHostname method. try { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java index 6486719..5213a78 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpResourceParent.java @@ -40,4 +40,9 @@ public interface AmqpResourceParent { */ void removeChildResource(AmqpResource resource); + /** + * @return a reference to the root AmqpProvider. + */ + AmqpProvider getProvider(); + } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index f9b461f..eb49c5f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -259,6 +259,7 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i getConnection().getProvider().fireProviderException(error); } + @Override public AmqpProvider getProvider() { return connection.getProvider(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java index 930fd32..b650892 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpTransactionContext.java @@ -253,7 +253,7 @@ public class AmqpTransactionContext implements AmqpResourceParent { txConsumers.clear(); } - //----- Resource Parent event handlers -----------------------------------// + //----- Resource Parent implementation -----------------------------------// @Override public void addChildResource(AmqpResource resource) { @@ -268,4 +268,9 @@ public class AmqpTransactionContext implements AmqpResourceParent { // to check if the current TX has failed due to link closed during // normal operations. } + + @Override + public AmqpProvider getProvider() { + return session.getProvider(); + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java index cad66a3..2de00f3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConnectionBuilder.java @@ -18,7 +18,6 @@ package org.apache.qpid.jms.provider.amqp.builders; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.SOLE_CONNECTION_CAPABILITY; -import java.io.IOException; import java.util.LinkedHashMap; import java.util.Map; @@ -96,16 +95,6 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A } @Override - protected void handleOpened(AmqpProvider provider) throws IOException { - // Initialize the connection properties so that the state of the remote can - // be determined, this allows us to check for close pending. - getResource().getProperties().initialize( - getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties()); - - super.handleOpened(provider); - } - - @Override protected Connection createEndpoint(JmsConnectionInfo resourceInfo) { String hostname = getParent().getVhost(); if (hostname == null) { @@ -134,6 +123,14 @@ public class AmqpConnectionBuilder extends AmqpResourceBuilder<AmqpConnection, A } @Override + protected void afterOpened() { + // Initialize the connection properties so that the state of the remote can + // be determined, this allows us to check for close pending. + getResource().getProperties().initialize( + getEndpoint().getRemoteOfferedCapabilities(), getEndpoint().getRemoteProperties()); + } + + @Override protected boolean isClosePending() { return getResource().getProperties().isConnectionOpenFailed(); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java index a6618e7..d8590ef 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpResourceBuilder.java @@ -17,6 +17,7 @@ package org.apache.qpid.jms.provider.amqp.builders; import java.io.IOException; +import java.util.concurrent.ScheduledFuture; import org.apache.qpid.jms.meta.JmsResource; import org.apache.qpid.jms.provider.AsyncResult; @@ -42,6 +43,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex private static final Logger LOG = LoggerFactory.getLogger(AmqpResourceBuilder.class); protected AsyncResult request; + protected ScheduledFuture<?> requestTimeoutTask; protected TARGET resource; protected ENDPOINT endpoint; protected final PARENT parent; @@ -61,7 +63,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex * @param request * The request that initiated the resource creation. */ - public void buildResource(AsyncResult request) { + public void buildResource(final AsyncResult request) { this.request = request; // Create the local end of the manage resource. @@ -71,6 +73,32 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex // Create the resource object now resource = createResource(parent, resourceInfo, endpoint); + + if (parent.getProvider().getRequestTimeout() > 0) { + + // Attempt to schedule a cancellation of the pending open request, can return + // null if there is no configured request timeout. + requestTimeoutTask = parent.getProvider().scheduleRequestTimeout(new AsyncResult() { + + @Override + public void onSuccess() { + // Nothing to do here. + } + + @Override + public void onFailure(Throwable result) { + // We ignore the default error and attempt to coerce a more + // meaningful error from the endpoint. + handleClosed(parent.getProvider()); + } + + @Override + public boolean isComplete() { + return request.isComplete(); + } + + }, null); + } } //----- Event handlers ---------------------------------------------------// @@ -102,15 +130,20 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex //----- Standard open and close handlers ---------------------------------// - protected void handleOpened(AmqpProvider provider) throws IOException { + protected final void handleOpened(AmqpProvider provider) { + + // perform any post open processing prior to opened state inspection. + afterOpened(); if (isClosePending()) { return; } - if (isOpenedEndpointValid()) { - afterOpened(); + if (requestTimeoutTask != null) { + requestTimeoutTask.cancel(false); + } + if (isOpenedEndpointValid()) { getEndpoint().setContext(resource); getParent().addChildResource(resource); getRequest().onSuccess(); @@ -125,7 +158,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex } } - protected void handleClosed(AmqpProvider provider) throws IOException { + protected final void handleClosed(AmqpProvider provider) { // If the resource being built is closed during the creation process // then this is always an error. @@ -136,6 +169,10 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex openError = getOpenAbortException(); } + if (requestTimeoutTask != null) { + requestTimeoutTask.cancel(false); + } + LOG.warn("Open of resource:({}) failed: {}", resourceInfo, openError.getMessage()); // This resource is now terminated. @@ -193,7 +230,7 @@ public abstract class AmqpResourceBuilder<TARGET extends AmqpResource, PARENT ex /** * Called once an endpoint has been opened and validated to give the subclasses a * place to perform any follow-on processing or setup steps before the operation - * is deemed to have been completed and success is signalled. + * is deemed to have been completed and success is signaled. */ protected void afterOpened() { // Nothing to do here. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java index 01f173b..954d9b3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpTemporaryDestinationBuilder.java @@ -105,12 +105,14 @@ public class AmqpTemporaryDestinationBuilder extends AmqpResourceBuilder<AmqpTem @Override protected void afterOpened() { - // Once our sender is opened we can read the updated name from the target address. - String oldDestinationName = resourceInfo.getName(); - String destinationName = getEndpoint().getRemoteTarget().getAddress(); + if (!isClosePending()) { + // Once our sender is opened we can read the updated name from the target address. + String oldDestinationName = resourceInfo.getName(); + String destinationName = getEndpoint().getRemoteTarget().getAddress(); - resourceInfo.setName(destinationName); + resourceInfo.setName(destinationName); - LOG.trace("Updated temp destination to: {} from: {}", destinationName, oldDestinationName); + LOG.trace("Updated temp destination to: {} from: {}", destinationName, oldDestinationName); + } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java index a84d260..d56bb62 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionIntegrationTest.java @@ -124,6 +124,31 @@ public class ConnectionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCreateTransactedSessionFailsWhenNoDetachResponseSent() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + ((JmsConnection) connection).setRequestTimeout(500); + + testPeer.expectBegin(); + // Expect the session, with an immediate link to the transaction coordinator + // using a target with the expected capabilities only. + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN)); + testPeer.expectSenderAttach(notNullValue(), txCoordinatorMatcher, true, true, false, 0, null, null); + testPeer.expectDetach(true, false, false); + + try { + connection.createSession(true, Session.SESSION_TRANSACTED); + fail("Session create should have failed."); + } catch (JMSException ex) { + // Expected + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testRemotelyCloseConnectionDuringSessionCreation() throws Exception { final String BREAD_CRUMB = "ErrorMessageBreadCrumb"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index 4d36e8c..bf26f20 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -47,6 +47,7 @@ import javax.jms.MessageConsumer; import javax.jms.MessageListener; import javax.jms.MessageProducer; import javax.jms.Queue; +import javax.jms.QueueBrowser; import javax.jms.Session; import javax.jms.TemporaryQueue; import javax.jms.TemporaryTopic; @@ -214,6 +215,77 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCreateConsumerFailsWhenLinkRefusalResponseNotSent() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + + ((JmsConnection) connection).setRequestTimeout(500); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect a link to a topic node, which we will then refuse + SourceMatcher targetMatcher = new SourceMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + testPeer.expectReceiverAttach(notNullValue(), targetMatcher, false, true, true, false, null, null); + testPeer.expectDetach(true, false, false); + + try { + // Create a consumer, expect it to throw exception due to the link-refusal + // even though there is no detach response. + session.createConsumer(dest); + fail("Consumer creation should have failed when link was refused"); + } catch(JMSException ex) { + // Expected + LOG.info("Caught expected error on consumer create: {}", ex.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testCreateBrowserFailsWhenLinkRefusalResponseNotSent() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + + ((JmsConnection) connection).setRequestTimeout(500); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String queueName = "myQueue"; + Queue dest = session.createQueue(queueName); + + testPeer.expectReceiverAttach(notNullValue(), notNullValue(), true, true, true, false, null, null); + testPeer.expectDetach(true, false, false); + + try { + // Create a QueueBrowser, expect it to throw exception due to the link-refusal + // even though there is no detach response. + QueueBrowser browser = session.createBrowser(dest); + browser.getEnumeration(); + fail("Consumer creation should have failed when link was refused"); + } catch(JMSException ex) { + // Expected + LOG.info("Caught expected error on browser create: {}", ex.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testCreateTemporaryQueueFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { doCreateTemporaryDestinationFailsWhenLinkRefusedTestImpl(false, false); } @@ -421,7 +493,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { String topicName = "myTopic"; Topic destination = session.createTopic(topicName); - testPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable"); + testPeer.expectReceiverAttach(notNullValue(), notNullValue(), false, true, false, false, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable"); testPeer.expectDetach(true, true, true); try { @@ -446,7 +518,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { String topicName = "myTopic"; Topic destination = session.createTopic(topicName); - testPeer.expectSenderAttach(notNullValue(), notNullValue(), true, true, 0L, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable"); + testPeer.expectSenderAttach(notNullValue(), notNullValue(), true, false, true, 0L, AmqpError.UNAUTHORIZED_ACCESS, "Destination is not readable"); testPeer.expectDetach(true, true, true); try { @@ -897,6 +969,42 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCreateProducerFailsWhenLinkRefusedNoDetachSent() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer); + ((JmsConnection) connection).setRequestTimeout(500); + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String topicName = "myTopic"; + Topic dest = session.createTopic(topicName); + + // Expect a link to a topic node, which we will then refuse + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(equalTo(topicName)); + targetMatcher.withDynamic(equalTo(false)); + targetMatcher.withDurable(equalTo(TerminusDurability.NONE)); + + testPeer.expectSenderAttach(notNullValue(), targetMatcher, true, true, false, 0, null, null); + // Expect the detach response to the test peer closing the producer link after refusal. + testPeer.expectDetach(true, false, false); + + try { + // Create a producer, expect it to throw exception due to the link-refusal + session.createProducer(dest); + fail("Producer creation should have failed when link was refused"); + } catch(JMSException ex) { + // Expected + LOG.info("Caught expected exception on create: {}", ex.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/30e54e08/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 7fc557b..9396e3b 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -845,7 +845,7 @@ public class TestAmqpPeer implements AutoCloseable public void expectSenderAttach(long creditFlowDelay) { - expectSenderAttach(notNullValue(), notNullValue(), false, false, creditFlowDelay, null, null); + expectSenderAttach(notNullValue(), notNullValue(), false, false, false, creditFlowDelay, null, null); } public void expectSenderAttach(final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite) @@ -855,10 +855,10 @@ public class TestAmqpPeer implements AutoCloseable public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite) { - expectSenderAttach(notNullValue(), targetMatcher, refuseLink, deferAttachResponseWrite, 0, null, null); + expectSenderAttach(notNullValue(), targetMatcher, refuseLink, false, deferAttachResponseWrite, 0, null, null); } - public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage) + public void expectSenderAttach(final Matcher<?> sourceMatcher, final Matcher<?> targetMatcher, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, long creditFlowDelay, Symbol errorType, String errorMessage) { final AttachMatcher attachMatcher = new AttachMatcher() .withName(notNullValue()) @@ -908,53 +908,56 @@ public class TestAmqpPeer implements AutoCloseable attachResponseSender.setDeferWrite(true); } - final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded + CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); + composite.add(attachResponseSender); + if (refuseLink) { + if (!omitDetach) { + final DetachFrame detachResponse = new DetachFrame().setClosed(true); + if (errorType != null) + { + org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); + + detachError.setCondition(errorType); + detachError.setDescription(errorMessage); + + detachResponse.setError(detachError); + } + + // The response frame channel will be dynamically set based on the + // incoming frame. Using the -1 is an illegal placeholder. + final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); + detachResonseSender.setValueProvider(new ValueProvider() { + @Override + public void setValues() { + detachResonseSender.setChannel(attachMatcher.getActualChannel()); + detachResponse.setHandle(attachMatcher.getReceivedHandle()); + } + }); + + composite.add(detachResonseSender); + } + } else { + final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded .setIncomingWindow(UnsignedInteger.valueOf(2048)) .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded .setOutgoingWindow(UnsignedInteger.valueOf(2048)) .setLinkCredit(UnsignedInteger.valueOf(100)); - // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. - final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); - flowFrameSender.setValueProvider(new ValueProvider() - { - @Override - public void setValues() + // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); + flowFrameSender.setValueProvider(new ValueProvider() { - flowFrameSender.setChannel(attachMatcher.getActualChannel()); - flowFrame.setHandle(attachMatcher.getReceivedHandle()); - flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount()); - } - }); - flowFrameSender.setSendDelay(creditFlowDelay); - - final DetachFrame detachResponse = new DetachFrame().setClosed(true); - if (errorType != null) - { - org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); - - detachError.setCondition(errorType); - detachError.setDescription(errorMessage); - - detachResponse.setError(detachError); - } + @Override + public void setValues() + { + flowFrameSender.setChannel(attachMatcher.getActualChannel()); + flowFrame.setHandle(attachMatcher.getReceivedHandle()); + flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount()); + } + }); - // The response frame channel will be dynamically set based on the - // incoming frame. Using the -1 is an illegal placeholder. - final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); - detachResonseSender.setValueProvider(new ValueProvider() { - @Override - public void setValues() { - detachResonseSender.setChannel(attachMatcher.getActualChannel()); - detachResponse.setHandle(attachMatcher.getReceivedHandle()); - } - }); + flowFrameSender.setSendDelay(creditFlowDelay); - CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); - composite.add(attachResponseSender); - if (refuseLink) { - composite.add(detachResonseSender); - } else { composite.add(flowFrameSender); } @@ -975,7 +978,7 @@ public class TestAmqpPeer implements AutoCloseable public void expectCoordinatorAttach(final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) { - expectSenderAttach(notNullValue(), new CoordinatorMatcher(), refuseLink, deferAttachResponseWrite, 0, errorType, errorMessage); + expectSenderAttach(notNullValue(), new CoordinatorMatcher(), refuseLink, false, deferAttachResponseWrite, 0, errorType, errorMessage); } public void expectQueueBrowserAttach() @@ -990,25 +993,25 @@ public class TestAmqpPeer implements AutoCloseable public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher) { - expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, null, null); + expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, false, null, null); } public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled) { - expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, false, false, null, null); + expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, false, false, false, null, null); } public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean refuseLink, boolean deferAttachResponseWrite) { - expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, deferAttachResponseWrite, null, null); + expectReceiverAttach(linkNameMatcher, sourceMatcher, false, refuseLink, false, deferAttachResponseWrite, null, null); } public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean deferAttachResponseWrite) { - expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, deferAttachResponseWrite, null, null); + expectReceiverAttach(linkNameMatcher, sourceMatcher, settled, refuseLink, false, deferAttachResponseWrite, null, null); } - public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) + public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher, final boolean settled, final boolean refuseLink, boolean omitDetach, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) { final AttachMatcher attachMatcher = new AttachMatcher() .withName(linkNameMatcher) @@ -1057,7 +1060,7 @@ public class TestAmqpPeer implements AutoCloseable CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); composite.add(attachResponseSender); - if (refuseLink) + if (refuseLink && !omitDetach) { final DetachFrame detachResponse = new DetachFrame().setClosed(true); if (errorType != null) @@ -1072,18 +1075,18 @@ public class TestAmqpPeer implements AutoCloseable // The response frame channel will be dynamically set based on the // incoming frame. Using the -1 is an illegal placeholder. - final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); - detachResonseSender.setValueProvider(new ValueProvider() + final FrameSender detachResponseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); + detachResponseSender.setValueProvider(new ValueProvider() { @Override public void setValues() { - detachResonseSender.setChannel(attachMatcher.getActualChannel()); + detachResponseSender.setChannel(attachMatcher.getActualChannel()); detachResponse.setHandle(attachMatcher.getReceivedHandle()); } }); - composite.add(detachResonseSender); + composite.add(detachResponseSender); } attachMatcher.onCompletion(composite); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
