Repository: qpid-jms Updated Branches: refs/heads/master 176640f1d -> 777cc4614
https://issues.apache.org/jira/browse/QPIDJMS-173 Some initial refactoring and clean up, adds a test for the global presettle consumer option. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/777cc461 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/777cc461 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/777cc461 Branch: refs/heads/master Commit: 777cc4614b7f36cca30311a5dcd8f37a7a393ddd Parents: 176640f Author: Timothy Bish <[email protected]> Authored: Mon May 2 18:09:13 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon May 2 18:09:13 2016 -0400 ---------------------------------------------------------------------- .../apache/qpid/jms/meta/JmsConsumerInfo.java | 9 + .../qpid/jms/provider/amqp/AmqpConsumer.java | 17 +- .../amqp/builders/AmqpConsumerBuilder.java | 2 +- .../PresettledConsumerIntegrationTest.java | 192 +++++++++++++++++++ .../qpid/jms/test/testpeer/TestAmqpPeer.java | 17 ++ 5 files changed, 226 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java index a4a4b2a..f5b791a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConsumerInfo.java @@ -31,6 +31,7 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume private boolean noLocal; private int acknowledgementMode; private boolean localMessageExpiry; + private boolean presettle; private JmsRedeliveryPolicy redeliveryPolicy; @@ -171,6 +172,14 @@ public final class JmsConsumerInfo implements JmsResource, Comparable<JmsConsume this.redeliveryPolicy = redeliveryPolicy; } + public boolean isPresettle() { + return presettle; + } + + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } + @Override public String toString() { return "JmsConsumerInfo: { " + getId() + ", destination = " + getDestination() + " }"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java index 916949c..8f5e651 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConsumer.java @@ -19,8 +19,6 @@ package org.apache.qpid.jms.provider.amqp; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.MODIFIED_FAILED_UNDELIVERABLE; import static org.apache.qpid.jms.provider.amqp.AmqpSupport.REJECTED; -import io.netty.buffer.ByteBuf; -import io.netty.buffer.Unpooled; import java.io.IOException; import java.util.ArrayList; @@ -53,6 +51,9 @@ import org.apache.qpid.proton.message.Message; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; + /** * AMQP Consumer object that is used to manage JMS MessageConsumer semantics. */ @@ -225,15 +226,15 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver if (ackType.equals(ACK_TYPE.DELIVERED)) { LOG.debug("Delivered Ack of message: {}", envelope); - if (!isPresettle()) { + if (!delivery.isSettled()) { delivered.put(envelope, delivery); + delivery.setDefaultDeliveryState(MODIFIED_FAILED); } - delivery.setDefaultDeliveryState(MODIFIED_FAILED); sendFlowIfNeeded(); } else if (ackType.equals(ACK_TYPE.ACCEPTED)) { // A Consumer may not always send a DELIVERED ack so we need to // check to ensure we don't add too much credit to the link. - if (isPresettle() || delivered.remove(envelope) == null) { + if (delivery.isSettled() || delivered.remove(envelope) == null) { sendFlowIfNeeded(); } LOG.debug("Accepted Ack of message: {}", envelope); @@ -503,11 +504,7 @@ public class AmqpConsumer extends AmqpAbstractResource<JmsConsumerInfo, Receiver protected void deliver(JmsInboundMessageDispatch envelope) throws Exception { ProviderListener listener = session.getProvider().getProviderListener(); if (listener != null) { - if (envelope.getMessage() != null) { - LOG.debug("Dispatching received message: {}", envelope); - } else { - LOG.debug("Dispatching end of pull/browse to: {}", envelope.getConsumerId()); - } + LOG.debug("Dispatching received message: {}", envelope); listener.onInboundMessage(envelope); } else { LOG.error("Provider listener is not set, message will be dropped: {}", envelope); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java index 6361682..89b6d94 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpConsumerBuilder.java @@ -77,7 +77,7 @@ public class AmqpConsumerBuilder extends AmqpResourceBuilder<AmqpConsumer, AmqpS Receiver receiver = getParent().getEndpoint().receiver(receiverName); receiver.setSource(source); receiver.setTarget(target); - if (getParent().getConnection().isPresettleConsumers() || resourceInfo.isBrowser()) { + if (resourceInfo.isBrowser() || resourceInfo.isPresettle() || getParent().getConnection().isPresettleConsumers()) { receiver.setSenderSettleMode(SenderSettleMode.SETTLED); } else { receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java new file mode 100644 index 0000000..bfddcf5 --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java @@ -0,0 +1,192 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.integration; + +import static org.hamcrest.Matchers.arrayContaining; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.fail; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.MessageConsumer; +import javax.jms.Queue; +import javax.jms.Session; +import javax.jms.TemporaryQueue; +import javax.jms.TemporaryTopic; +import javax.jms.Topic; + +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.describedtypes.sections.AmqpValueDescribedType; +import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.transaction.TxnCapability; +import org.junit.Test; + +/** + * Test for Consumer state when various consumer presettle options are applied. + */ +public class PresettledConsumerIntegrationTest extends QpidJmsTestCase { + + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); + + //----- Test the amqp.presettleConsumers option --------------------------// + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedToTopic() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedToQueue() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedToTempTopic() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedToTempQueue() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTopic() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToQueue() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopic() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueue() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTopicNoRelaySupport() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToQueueNoRelaySupport() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopicNoRelaySupport() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class); + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueueNoRelaySupport() throws Exception { + String presettleConfig = "?amqp.presettleConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class); + } + + //----- Test Method implementation ---------------------------------------// + + private void doTestConsumerWithPresettleOptions(String uriOptions, boolean transacted, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, uriOptions); + connection.start(); + testPeer.expectBegin(); + + Session session = null; + Binary txnId = null; + + if (transacted) { + // Expect the session, with an immediate link to the transaction coordinator + // using a target with the expected capabilities only. + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN)); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + testPeer.expectDeclare(txnId); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } else { + // Use client ack so the receipt of the settled disposition is controllable. + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + Destination destination = null; + if (destType == Queue.class) { + destination = session.createQueue("MyQueue"); + } else if (destType == Topic.class) { + destination = session.createTopic("MyTopis"); + } else if (destType == TemporaryQueue.class) { + String dynamicAddress = "myTempQueueAddress"; + testPeer.expectTempQueueCreationAttach(dynamicAddress); + destination = session.createTemporaryQueue(); + } else if (destType == TemporaryTopic.class) { + String dynamicAddress = "myTempTopicAddress"; + testPeer.expectTempTopicCreationAttach(dynamicAddress); + destination = session.createTemporaryTopic(); + } else { + fail("unexpected type"); + } + + DescribedType amqpValueNullContent = new AmqpValueDescribedType(null); + + if (senderSettled) { + testPeer.expectSettledReceiverAttach(); + } else { + testPeer.expectReceiverAttach(); + } + + // Send a settled transfer, client should not send any dispositions + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, true); + + MessageConsumer consumer = session.createConsumer(destination); + assertNotNull(consumer.receive(100)); + + if (transacted) { + testPeer.expectDischarge(txnId, true); + } + + testPeer.expectClose(); + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/777cc461/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index eb9cba4..8e15f85 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -1035,6 +1035,11 @@ public class TestAmqpPeer implements AutoCloseable expectReceiverAttach(notNullValue(), notNullValue()); } + public void expectSettledReceiverAttach() + { + expectReceiverAttach(notNullValue(), notNullValue(), true, false, false, false, null, null); + } + public void expectReceiverAttach(final Matcher<?> linkNameMatcher, final Matcher<?> sourceMatcher) { expectReceiverAttach(linkNameMatcher, sourceMatcher, false, false, false, false, null, null); @@ -1229,6 +1234,18 @@ public class TestAmqpPeer implements AutoCloseable final PropertiesDescribedType propertiesDescribedType, final ApplicationPropertiesDescribedType appPropertiesDescribedType, final DescribedType content, + final boolean sendSettled) + { + expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, + appPropertiesDescribedType, content, 1, false, false, + Matchers.greaterThanOrEqualTo(UnsignedInteger.valueOf(1)), 1, sendSettled, false); + } + + public void expectLinkFlowRespondWithTransfer(final HeaderDescribedType headerDescribedType, + final MessageAnnotationsDescribedType messageAnnotationsDescribedType, + final PropertiesDescribedType propertiesDescribedType, + final ApplicationPropertiesDescribedType appPropertiesDescribedType, + final DescribedType content, final int count) { expectLinkFlowRespondWithTransfer(headerDescribedType, messageAnnotationsDescribedType, propertiesDescribedType, --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
