update anonymous producers to use a connection capability to signal support of the anonymous relay node
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/03be3ff0 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/03be3ff0 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/03be3ff0 Branch: refs/heads/master Commit: 03be3ff087b9de9e3f864f84c24b2c5a3c6fbfce Parents: 428c782 Author: Robert Gemmell <[email protected]> Authored: Tue Nov 18 17:08:39 2014 +0000 Committer: Robert Gemmell <[email protected]> Committed: Tue Nov 18 17:08:39 2014 +0000 ---------------------------------------------------------------------- .../amqp/AmqpAnonymousProducerWrapper.java | 115 ------------------- .../qpid/jms/provider/amqp/AmqpConnection.java | 1 - .../provider/amqp/AmqpConnectionProperties.java | 17 ++- .../qpid/jms/provider/amqp/AmqpSession.java | 6 +- .../jms/integration/SessionIntegrationTest.java | 73 +++++++++--- 5 files changed, 75 insertions(+), 137 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java deleted file mode 100644 index cc63ffa..0000000 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousProducerWrapper.java +++ /dev/null @@ -1,115 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.qpid.jms.provider.amqp; - -import java.io.IOException; - -import javax.jms.JMSException; - -import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; -import org.apache.qpid.jms.meta.JmsProducerInfo; -import org.apache.qpid.jms.provider.AsyncResult; -import org.apache.qpid.jms.provider.WrappedAsyncResult; -import org.apache.qpid.proton.engine.EndpointState; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Handles the case of anonymous JMS MessageProducers. - * - * In order to simulate the anonymous producer we must create a sender for each message - * send attempt and close it following a successful send. - */ -public class AmqpAnonymousProducerWrapper extends AmqpProducer { - - private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousProducerWrapper.class); - AmqpProducer delegate; - - /** - * Creates the Anonymous Producer object. - * - * @param session - * the session that owns this producer - * @param info - * the JmsProducerInfo for this producer. - */ - public AmqpAnonymousProducerWrapper(AmqpSession session, JmsProducerInfo info) { - super(session, info); - - delegate = new AmqpFixedProducer(session, info); - } - - @Override - public boolean send(JmsOutboundMessageDispatch envelope, AsyncResult request) throws IOException, JMSException { - LOG.trace("Delegating anonymous send to underlying producer: {}", getProducerId()); - - return delegate.send(envelope, request); - } - - @Override - public void open(AsyncResult request) { - AnonymousRelayRequest anonRelayRequest = new AnonymousRelayRequest(request); - delegate.open(anonRelayRequest); - } - - @Override - public void close(AsyncResult request) { - delegate.close(request); - } - - @Override - public boolean isAnonymous() { - return true; - } - - @Override - public EndpointState getLocalState() { - return delegate.getLocalState(); - } - - @Override - public EndpointState getRemoteState() { - return delegate.getRemoteState(); - } - - @Override - public void setPresettle(boolean presettle) { - delegate.setPresettle(presettle); - }; - - private class AnonymousRelayRequest extends WrappedAsyncResult { - - public AnonymousRelayRequest(AsyncResult openResult) { - super(openResult); - } - - /** - * If creation of the producer to the anonymous-relay failed, we try to - * enter fallback mode rather than immediately failing. - */ - @Override - public void onFailure(Throwable result) { - LOG.debug("Attempt to open producer to anonymous relay failed, entering fallback mode"); - - AmqpProducer newProducer = new AmqpAnonymousFallbackProducer(session, getJmsResource()); - newProducer.setPresettle(delegate.isPresettle()); - delegate = newProducer; - - delegate.open(getWrappedRequest()); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index 1cc5005..2badbcd 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -31,7 +31,6 @@ import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.provider.AsyncResult; import org.apache.qpid.jms.provider.amqp.message.AmqpJmsMessageFactory; import org.apache.qpid.jms.util.IOExceptionSupport; -import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Sasl; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java index 814c0f0..c3b0297 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java @@ -16,6 +16,8 @@ */ package org.apache.qpid.jms.provider.amqp; +import java.util.Arrays; +import java.util.List; import java.util.Map; import org.apache.qpid.proton.amqp.Symbol; @@ -27,6 +29,10 @@ import org.apache.qpid.proton.amqp.Symbol; */ public class AmqpConnectionProperties { + public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY"); + + private boolean anonymousRelaySupported = false; + /** * Creates a new instance of this class from the given remote capabilities and properties. * @@ -46,10 +52,19 @@ public class AmqpConnectionProperties { } protected void processCapabilities(Symbol[] capabilities) { - // TODO - Inspect capabilities for configuration options + List<Symbol> list = Arrays.asList(capabilities); + if (list.contains(ANONYMOUS_RELAY)) { + anonymousRelaySupported = true; + } + + // TODO - Inspect capabilities for any other configuration options } protected void processProperties(Map<Symbol, Object> properties) { // TODO - Inspect properties for configuration options } + + public boolean isAnonymousRelaySupported() { + return anonymousRelaySupported; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index d707f35..d6e0abc 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -102,12 +102,12 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> { public AmqpProducer createProducer(JmsProducerInfo producerInfo) { AmqpProducer producer = null; - if (producerInfo.getDestination() != null) { + if (producerInfo.getDestination() != null || connection.getProperties().isAnonymousRelaySupported()) { LOG.debug("Creating AmqpFixedProducer for: {}", producerInfo.getDestination()); producer = new AmqpFixedProducer(this, producerInfo); } else { - LOG.debug("Creating an AmqpAnonymousProducerWrapper"); - producer = new AmqpAnonymousProducerWrapper(this, producerInfo); + LOG.debug("Creating an AmqpAnonymousFallbackProducer"); + producer = new AmqpAnonymousFallbackProducer(this, producerInfo); } producer.setPresettle(connection.isPresettleProducers()); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/03be3ff0/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java index ed97272..93fae3c 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SessionIntegrationTest.java @@ -41,6 +41,7 @@ import javax.jms.TextMessage; import javax.jms.Topic; import javax.jms.TopicSubscriber; +import org.apache.qpid.jms.provider.amqp.AmqpConnectionProperties; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.DescriptorMatcher; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; @@ -58,6 +59,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionM import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher; import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; import org.junit.Test; public class SessionIntegrationTest extends QpidJmsTestCase { @@ -184,7 +186,10 @@ public class SessionIntegrationTest extends QpidJmsTestCase { @Test(timeout = 5000) public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsSupported() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { - Connection connection = testFixture.establishConnecton(testPeer); + //Add capability to indicate support for ANONYMOUS-RELAY + Symbol[] serverCapabilities = new Symbol[]{AmqpConnectionProperties.ANONYMOUS_RELAY}; + + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); connection.start(); testPeer.expectBegin(true); @@ -227,16 +232,54 @@ public class SessionIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 5000) - public void testCreateProducerFailsWhenLinkRefusedAndAttachFrameWriteIsNotDeferred() throws Exception { + public void testCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { + doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(false); + } + + @Test(timeout = 5000) + public void testCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception { + doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(true); + } + + private void doCreateAnonymousProducerFailsWhenAnonymousRelayNodeIsSupportedButLinkRefusedTestImpl(boolean deferAttachFrameWrite) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + //Add capability to indicate support for ANONYMOUS-RELAY + Symbol[] serverCapabilities = new Symbol[]{AmqpConnectionProperties.ANONYMOUS_RELAY}; + + Connection connection = testFixture.establishConnecton(testPeer, serverCapabilities); + connection.start(); + + testPeer.expectBegin(true); + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + + //Expect and refuse a link to the anonymous relay node + TargetMatcher targetMatcher = new TargetMatcher(); + targetMatcher.withAddress(nullValue()); + targetMatcher.withDynamic(nullValue());//default = false + targetMatcher.withDurable(nullValue());//default = none/0 + + testPeer.expectSenderAttach(targetMatcher, true, false); + + try { + session.createProducer(null); + fail("Expected producer creation to fail if anonymous-relay link refused"); + } catch (JMSException jmse) { + //expected + } + } + } + + @Test(timeout = 5000) + public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsNotDeferred() throws Exception { doCreateProducerFailsWhenLinkRefusedTestImpl(false); } @Test(timeout = 5000) - public void testCreateProducerFailsWhenLinkRefusedAndAttachFrameWriteIsDeferred() throws Exception { + public void testCreateProducerFailsWhenLinkRefusedAndAttachResponseWriteIsDeferred() throws Exception { doCreateProducerFailsWhenLinkRefusedTestImpl(true); } - private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean deferAttachFrameWrite) throws JMSException, InterruptedException, Exception, IOException { + private void doCreateProducerFailsWhenLinkRefusedTestImpl(boolean deferAttachResponseWrite) throws JMSException, InterruptedException, Exception, IOException { try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { Connection connection = testFixture.establishConnecton(testPeer); connection.start(); @@ -253,7 +296,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { targetMatcher.withDynamic(nullValue());//default = false targetMatcher.withDurable(nullValue());//default = none/0 - testPeer.expectSenderAttach(targetMatcher, true, deferAttachFrameWrite); + testPeer.expectSenderAttach(targetMatcher, true, deferAttachResponseWrite); //Expect the detach response to the test peer closing the producer link after refusal. testPeer.expectDetach(true, false, false); @@ -272,6 +315,9 @@ public class SessionIntegrationTest extends QpidJmsTestCase { @Test(timeout = 5000) public void testCreateAnonymousProducerWhenAnonymousRelayNodeIsNotSupported() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer(IntegrationTestFixture.PORT);) { + + //DO NOT add capability to indicate server support for ANONYMOUS-RELAY + Connection connection = testFixture.establishConnecton(testPeer); connection.start(); @@ -281,15 +327,8 @@ public class SessionIntegrationTest extends QpidJmsTestCase { String topicName = "myTopic"; Topic dest = session.createTopic(topicName); - //Expect and refuse a link to the anonymous relay node - TargetMatcher targetMatcher = new TargetMatcher(); - targetMatcher.withAddress(nullValue()); - targetMatcher.withDynamic(nullValue());//default = false - targetMatcher.withDurable(nullValue());//default = none/0 - - testPeer.expectSenderAttach(targetMatcher, true, false); - //Expect the detach response to the test peer closing the producer link after refusal. - testPeer.expectDetach(true, false, false); + // Expect no AMQP traffic when we create the anonymous producer, as it will wait + // for an actual send to occur on the producer before anything occurs on the wire //Create an anonymous producer MessageProducer producer = session.createProducer(null); @@ -297,7 +336,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { //Expect a new message sent by the above producer to cause creation of a new //sender link to the given destination, then closing the link after the message is sent. - TargetMatcher targetMatcher2 = new TargetMatcher(); + TargetMatcher targetMatcher = new TargetMatcher(); targetMatcher.withAddress(equalTo("topic://" + topicName)); //TODO: remove prefix targetMatcher.withDynamic(nullValue());//default = false targetMatcher.withDurable(nullValue());//default = none/0 @@ -308,7 +347,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { messageMatcher.setHeadersMatcher(headersMatcher); messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); - testPeer.expectSenderAttach(targetMatcher2, false, false); + testPeer.expectSenderAttach(targetMatcher, false, false); testPeer.expectTransfer(messageMatcher); testPeer.expectDetach(true, true, true); @@ -316,7 +355,7 @@ public class SessionIntegrationTest extends QpidJmsTestCase { producer.send(dest, message); //Repeat the send and observe another attach->transfer->detach. - testPeer.expectSenderAttach(targetMatcher2, false, false); + testPeer.expectSenderAttach(targetMatcher, false, false); testPeer.expectTransfer(messageMatcher); testPeer.expectDetach(true, true, true); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
