Repository: qpid-jms Updated Branches: refs/heads/master dc4b57067 -> d96150771
QPIDJMS-157 Add timeout tests for temp Topic and Queue create / delete handling. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/d9615077 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/d9615077 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/d9615077 Branch: refs/heads/master Commit: d96150771817ed30fae0fc72c56c8b2041991720 Parents: dc4b570 Author: Timothy Bish <[email protected]> Authored: Mon Mar 28 11:40:18 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Mar 28 11:40:18 2016 -0400 ---------------------------------------------------------------------- .../jms/integration/SessionIntegrationTest.java | 104 ++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 160 +++++++++++-------- 2 files changed, 193 insertions(+), 71 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9615077/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index e16fa55..ffb59b9 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -385,6 +385,29 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCreateTemporaryQueueTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + connection.start(); + + testPeer.expectBegin(); + testPeer.expectTempQueueCreationAttach(null, false); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + session.createTemporaryQueue(); + fail("Should have timed out on create."); + } catch (JmsOperationTimedOutException jmsEx) { + LOG.info("Caught expected exception: {}", jmsEx.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testCreateAndDeleteTemporaryQueue() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); @@ -406,6 +429,35 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testDeleteTemporaryQueueTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String dynamicAddress = "myTempQueueAddress"; + testPeer.expectTempQueueCreationAttach(dynamicAddress); + TemporaryQueue tempQueue = session.createTemporaryQueue(); + + // Deleting the TemporaryQueue will be achieved by closing its creating link. + testPeer.expectDetach(true, false, true); + + try { + tempQueue.delete(); + fail("Should have timed out waiting to delete."); + } catch (JmsOperationTimedOutException jmsEx) { + LOG.info("Caught expected exception: {}", jmsEx.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testCreateTemporaryTopic() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); @@ -427,6 +479,29 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testCreateTemporaryTopicTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + connection.start(); + + testPeer.expectBegin(); + testPeer.expectTempTopicCreationAttach(null, false); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + try { + session.createTemporaryTopic(); + fail("Should have timed out on create."); + } catch (JmsOperationTimedOutException jmsEx) { + LOG.info("Caught expected exception: {}", jmsEx.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testCreateAndDeleteTemporaryTopic() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { Connection connection = testFixture.establishConnecton(testPeer); @@ -448,6 +523,35 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testDeleteTemporaryTopicTimesOut() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + JmsConnection connection = (JmsConnection) testFixture.establishConnecton(testPeer); + connection.setRequestTimeout(500); + + connection.start(); + + testPeer.expectBegin(); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + String dynamicAddress = "myTempTopicAddress"; + testPeer.expectTempTopicCreationAttach(dynamicAddress); + TemporaryTopic tempTopic = session.createTemporaryTopic(); + + // Deleting the TemporaryTopic will be achieved by closing its creating link. + testPeer.expectDetach(true, false, true); + + try { + tempTopic.delete(); + fail("Should have timed out waiting to delete."); + } catch (JmsOperationTimedOutException jmsEx) { + LOG.info("Caught expected exception: {}", jmsEx.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testCreateConsumerSourceContainsQueueCapability() throws Exception { doCreateConsumerSourceContainsCapabilityTestImpl(Queue.class); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/d9615077/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index 296c108..582a0d2 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -717,11 +717,21 @@ public class TestAmqpPeer implements AutoCloseable expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, false, false, null, null); } + public void expectTempQueueCreationAttach(final String dynamicAddress, boolean sendReponse) + { + expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, sendReponse, false, false, null, null); + } + public void expectTempTopicCreationAttach(final String dynamicAddress) { expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY, false, false, null, null); } + public void expectTempTopicCreationAttach(final String dynamicAddress, boolean sendReponse) + { + expectTempNodeCreationAttach(dynamicAddress, AmqpDestinationHelper.TEMP_TOPIC_CAPABILITY, sendReponse, false, false, null, null); + } + public void expectAndRefuseTempQueueCreationAttach(Symbol errorType, String errorMessage, boolean deferAttachResponseWrite) { expectTempNodeCreationAttach(null, AmqpDestinationHelper.TEMP_QUEUE_CAPABILITY, true, deferAttachResponseWrite, errorType, errorMessage); @@ -734,6 +744,11 @@ public class TestAmqpPeer implements AutoCloseable private void expectTempNodeCreationAttach(final String dynamicAddress, final Symbol nodeTypeCapability, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) { + expectTempNodeCreationAttach(dynamicAddress, nodeTypeCapability, true, refuseLink, deferAttachResponseWrite, errorType, errorMessage); + } + + private void expectTempNodeCreationAttach(final String dynamicAddress, final Symbol nodeTypeCapability, boolean sendResponse, final boolean refuseLink, boolean deferAttachResponseWrite, Symbol errorType, String errorMessage) + { TargetMatcher targetMatcher = new TargetMatcher(); targetMatcher.withAddress(nullValue()); targetMatcher.withDynamic(equalTo(true)); @@ -751,95 +766,98 @@ public class TestAmqpPeer implements AutoCloseable .withSource(notNullValue()) .withTarget(targetMatcher); - final AttachFrame attachResponse = new AttachFrame() - .setRole(Role.RECEIVER) - .setSndSettleMode(SenderSettleMode.UNSETTLED) - .setRcvSettleMode(ReceiverSettleMode.FIRST); - - // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. - final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null); - attachResponseSender.setValueProvider(new ValueProvider() + if (sendResponse) { - @Override - public void setValues() - { - Object receivedHandle = attachMatcher.getReceivedHandle(); + final AttachFrame attachResponse = new AttachFrame() + .setRole(Role.RECEIVER) + .setSndSettleMode(SenderSettleMode.UNSETTLED) + .setRcvSettleMode(ReceiverSettleMode.FIRST); - attachResponseSender.setChannel(attachMatcher.getActualChannel()); - attachResponse.setHandle(receivedHandle); - attachResponse.setName(attachMatcher.getReceivedName()); - attachResponse.setSource(attachMatcher.getReceivedSource()); + // The response frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender attachResponseSender = new FrameSender(this, FrameType.AMQP, -1, attachResponse, null); + attachResponseSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + Object receivedHandle = attachMatcher.getReceivedHandle(); + + attachResponseSender.setChannel(attachMatcher.getActualChannel()); + attachResponse.setHandle(receivedHandle); + attachResponse.setName(attachMatcher.getReceivedName()); + attachResponse.setSource(attachMatcher.getReceivedSource()); + + if (!refuseLink) { + Target t = (Target) createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget()); + t.setAddress(dynamicAddress); + attachResponse.setTarget(t); + } else { + attachResponse.setTarget(null); + } - if (!refuseLink) { - Target t = (Target) createTargetObjectFromDescribedType(attachMatcher.getReceivedTarget()); - t.setAddress(dynamicAddress); - attachResponse.setTarget(t); - } else { - attachResponse.setTarget(null); + _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle; } + }); - _lastInitiatedLinkHandle = (UnsignedInteger) receivedHandle; + if (deferAttachResponseWrite) + { + // Defer writing the attach frame until the subsequent frame is also ready + attachResponseSender.setDeferWrite(true); } - }); - if (deferAttachResponseWrite) - { - // Defer writing the attach frame until the subsequent frame is also ready - attachResponseSender.setDeferWrite(true); - } + CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); + composite.add(attachResponseSender); - CompositeAmqpPeerRunnable composite = new CompositeAmqpPeerRunnable(); - composite.add(attachResponseSender); + if (!refuseLink) { + final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded + .setIncomingWindow(UnsignedInteger.valueOf(2048)) + .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldn't be hard coded + .setOutgoingWindow(UnsignedInteger.valueOf(2048)) + .setLinkCredit(UnsignedInteger.valueOf(100)); - if (!refuseLink) { - final FlowFrame flowFrame = new FlowFrame().setNextIncomingId(UnsignedInteger.ONE) //TODO: shouldnt be hard coded - .setIncomingWindow(UnsignedInteger.valueOf(2048)) - .setNextOutgoingId(UnsignedInteger.ONE) //TODO: shouldn't be hard coded - .setOutgoingWindow(UnsignedInteger.valueOf(2048)) - .setLinkCredit(UnsignedInteger.valueOf(100)); + // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. + final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); + flowFrameSender.setValueProvider(new ValueProvider() + { + @Override + public void setValues() + { + flowFrameSender.setChannel(attachMatcher.getActualChannel()); + flowFrame.setHandle(attachMatcher.getReceivedHandle()); + flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount()); + } + }); - // The flow frame channel will be dynamically set based on the incoming frame. Using the -1 is an illegal placeholder. - final FrameSender flowFrameSender = new FrameSender(this, FrameType.AMQP, -1, flowFrame, null); - flowFrameSender.setValueProvider(new ValueProvider() - { - @Override - public void setValues() + composite.add(flowFrameSender); + } else { + final DetachFrame detachResponse = new DetachFrame().setClosed(true); + if (errorType != null) { - flowFrameSender.setChannel(attachMatcher.getActualChannel()); - flowFrame.setHandle(attachMatcher.getReceivedHandle()); - flowFrame.setDeliveryCount(attachMatcher.getReceivedInitialDeliveryCount()); - } - }); + org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); - composite.add(flowFrameSender); - } else { - final DetachFrame detachResponse = new DetachFrame().setClosed(true); - if (errorType != null) - { - org.apache.qpid.jms.test.testpeer.describedtypes.Error detachError = new org.apache.qpid.jms.test.testpeer.describedtypes.Error(); + detachError.setCondition(errorType); + detachError.setDescription(errorMessage); - detachError.setCondition(errorType); - detachError.setDescription(errorMessage); + detachResponse.setError(detachError); + } - detachResponse.setError(detachError); - } + // The response frame channel will be dynamically set based on the + // incoming frame. Using the -1 is an illegal placeholder. + final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); + detachResonseSender.setValueProvider(new ValueProvider() { + @Override + public void setValues() { + detachResonseSender.setChannel(attachMatcher.getActualChannel()); + detachResponse.setHandle(attachMatcher.getReceivedHandle()); + } + }); - // The response frame channel will be dynamically set based on the - // incoming frame. Using the -1 is an illegal placeholder. - final FrameSender detachResonseSender = new FrameSender(this, FrameType.AMQP, -1, detachResponse, null); - detachResonseSender.setValueProvider(new ValueProvider() { - @Override - public void setValues() { - detachResonseSender.setChannel(attachMatcher.getActualChannel()); - detachResponse.setHandle(attachMatcher.getReceivedHandle()); - } - }); + composite.add(detachResonseSender); + } - composite.add(detachResonseSender); + attachMatcher.onCompletion(composite); } - attachMatcher.onCompletion(composite); - addHandler(attachMatcher); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
