Repository: qpid-jms Updated Branches: refs/heads/master 777cc4614 -> c3e37f272
QPIDJMS-173 Add options to control consumer presettle state. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/c3e37f27 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/c3e37f27 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/c3e37f27 Branch: refs/heads/master Commit: c3e37f27286b4e91d3bf0ac6fceb3c0ac7956dfd Parents: 777cc46 Author: Timothy Bish <[email protected]> Authored: Tue May 3 16:19:26 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue May 3 16:19:26 2016 -0400 ---------------------------------------------------------------------- .../org/apache/qpid/jms/JmsMessageConsumer.java | 1 + .../org/apache/qpid/jms/JmsMessageProducer.java | 2 +- .../org/apache/qpid/jms/JmsPresettlePolicy.java | 116 +++++++++++++- .../java/org/apache/qpid/jms/JmsSession.java | 2 +- .../PresettledConsumerIntegrationTest.java | 153 ++++++++++++++++--- 5 files changed, 248 insertions(+), 26 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java index 826438c..46f19ca 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java @@ -98,6 +98,7 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe consumerInfo.setPrefetchSize(getConfiguredPrefetch(destination, policy)); consumerInfo.setRedeliveryPolicy(redeliveryPolicy); consumerInfo.setLocalMessageExpiry(connection.isLocalMessageExpiry()); + consumerInfo.setPresettle(session.getPresettlePolicy().isConsumerPresttled(destination, session)); session.getConnection().createResource(consumerInfo); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java index 84e4017..0cb1646 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java @@ -56,7 +56,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { this.anonymousProducer = destination == null; this.producerInfo = new JmsProducerInfo(producerId); this.producerInfo.setDestination(destination); - this.producerInfo.setPresettle(session.getPresettlePolicy().isSendPresttled(destination, session)); + this.producerInfo.setPresettle(session.getPresettlePolicy().isProducerPresttled(destination, session)); session.getConnection().createResource(producerInfo); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java index c6079d6..bf15ef0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java @@ -23,11 +23,17 @@ package org.apache.qpid.jms; public class JmsPresettlePolicy { private boolean presettleAll; + private boolean presettleProducers; private boolean presettleTopicProducers; private boolean presettleQueueProducers; private boolean presettleTransactedProducers; + private boolean presettleConsumers; + private boolean presettleTopicConsumers; + private boolean presettleQueueConsumers; + private boolean presettleTransactedConsumers; + public JmsPresettlePolicy() { } @@ -37,6 +43,10 @@ public class JmsPresettlePolicy { this.presettleTopicProducers = source.presettleTopicProducers; this.presettleQueueProducers = source.presettleQueueProducers; this.presettleTransactedProducers = source.presettleTransactedProducers; + this.presettleConsumers = source.presettleConsumers; + this.presettleTopicConsumers = source.presettleTopicConsumers; + this.presettleQueueConsumers = source.presettleQueueConsumers; + this.presettleTransactedConsumers = source.presettleTransactedConsumers; } public JmsPresettlePolicy copy() { @@ -148,11 +158,11 @@ public class JmsPresettlePolicy { * @param destination * the destination that the producer will be sending to. * @param session - * the session that owns the producer that will send be sending a message. + * the session that owns the producer. * * @return true if the producer should send presettled. */ - public boolean isSendPresttled(JmsDestination destination, JmsSession session) { + public boolean isProducerPresttled(JmsDestination destination, JmsSession session) { if (presettleAll || presettleProducers) { return true; @@ -166,4 +176,106 @@ public class JmsPresettlePolicy { return false; } + + /** + * @return the presettleConsumers configuration value for this policy. + */ + public boolean isPresettleConsumers() { + return presettleConsumers; + } + + /** + * The presettle all consumers value to apply. When true all MessageConsumer + * instances created will indicate that presettled messages are requested. + * + * @param presettleConsumers + * the presettleConsumers value to apply to this policy. + */ + public void setPresettleConsumers(boolean presettleConsumers) { + this.presettleConsumers = presettleConsumers; + } + + /** + * @return the presettleTopicConsumers setting for this policy. + */ + public boolean isPresettleTopicConsumers() { + return presettleTopicConsumers; + } + + /** + * The presettle Topic consumers value to apply. When true any MessageConsumer for + * a Topic destination will indicate that presettled messages are requested. + * + * @param presettleTopicConsumers + * the presettleTopicConsumers value to apply to this policy. + */ + public void setPresettleTopicConsumers(boolean presettleTopicConsumers) { + this.presettleTopicConsumers = presettleTopicConsumers; + } + + /** + * @return the presettleQueueConsumers setting for this policy. + */ + public boolean isPresettleQueueConsumers() { + return presettleQueueConsumers; + } + + /** + * The presettle Queue consumers value to apply. When true any MessageConsumer for + * a Queue destination will indicate that presettled messages are requested. + * + * @param presettleQueueConsumers + * the presettleQueueConsumers value to apply to this policy. + */ + public void setPresettleQueueConsumers(boolean presettleQueueConsumers) { + this.presettleQueueConsumers = presettleQueueConsumers; + } + + /** + * @return the presettleTransactedConsumers setting for this policy. + */ + public boolean isPresettleTransactedConsumers() { + return presettleTransactedConsumers; + } + + /** + * The presettle consumers inside a transaction value to apply. When true all the + * MessageConsumer created in a transacted Session will indicate that presettled + * messages are requested. + * + * @param presettleTransactedConsumers + * the presettleTransactedConsumers value to apply to this policy. + */ + public void setPresettleTransactedConsumers(boolean presettleTransactedConsumers) { + this.presettleTransactedConsumers = presettleTransactedConsumers; + } + + /** + * Determines when a consumer will be created with the settlement mode set to presettled. + * <p> + * Called when the a consumer is being created to determine whether the consumer will + * be configured to request that the remote sends it message that are presettled. + * <p> + * + * @param destination + * the destination that the consumer will be listening to. + * @param session + * the session that owns the consumer being created. + * + * @return true if the producer should send presettled. + */ + public boolean isConsumerPresttled(JmsDestination destination, JmsSession session) { + + if (presettleAll || presettleConsumers) { + return true; + } else if (session.isTransacted() && presettleTransactedConsumers) { + return true; + } else if (destination != null && destination.isQueue() && presettleQueueConsumers) { + return true; + } else if (destination != null && destination.isTopic() && presettleTopicConsumers) { + return true; + } + + return false; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index a3e2b27..64c514e 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -699,7 +699,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe envelope.setDispatchId(messageSequence); if (producer.isAnonymous()) { - envelope.setPresettle(presettlePolicy.isSendPresttled(destination, this)); + envelope.setPresettle(presettlePolicy.isProducerPresttled(destination, this)); } transactionContext.send(connection, envelope); http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/c3e37f27/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java index bfddcf5..ff98387 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledConsumerIntegrationTest.java @@ -48,77 +48,183 @@ public class PresettledConsumerIntegrationTest extends QpidJmsTestCase { //----- Test the amqp.presettleConsumers option --------------------------// @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedToTopic() throws Exception { + public void testPresettledConsumersConfigurationAppliedToTopic() throws Exception { String presettleConfig = "?amqp.presettleConsumers=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedToQueue() throws Exception { + public void testPresettledConsumersConfigurationAppliedToQueue() throws Exception { String presettleConfig = "?amqp.presettleConsumers=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedToTempTopic() throws Exception { + public void testPresettledConsumersConfigurationAppliedToTempTopic() throws Exception { String presettleConfig = "?amqp.presettleConsumers=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedToTempQueue() throws Exception { + public void testPresettledConsumersConfigurationAppliedToTempQueue() throws Exception { String presettleConfig = "?amqp.presettleConsumers=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class); } + //----- Test the jms.presettlePolicy.presettleAll option -----------------// + @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedAnonymousSendToTopic() throws Exception { - String presettleConfig = "?amqp.presettleConsumers=true"; + public void testPresettleAllConfigurationAppliedToTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedAnonymousSendToQueue() throws Exception { - String presettleConfig = "?amqp.presettleConsumers=true"; + public void testPresettledAllConfigurationAppliedToQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopic() throws Exception { - String presettleConfig = "?amqp.presettleConsumers=true"; + public void testPresettledAllConfigurationAppliedToTempTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueue() throws Exception { - String presettleConfig = "?amqp.presettleConsumers=true"; + public void testPresettledAllConfigurationAppliedToTempQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class); } + //----- Test the jms.presettlePolicy.presettleTopicConsumers option ------// + @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedAnonymousSendToTopicNoRelaySupport() throws Exception { - String presettleConfig = "?amqp.presettleConsumers=true"; + public void testPresettleTopicConsumersConfigurationAppliedToTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Topic.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedAnonymousSendToQueueNoRelaySupport() throws Exception { - String presettleConfig = "?amqp.presettleConsumers=true"; - doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class); + public void testPresettledTopicConsumersConfigurationAppliedToQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Queue.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopicNoRelaySupport() throws Exception { - String presettleConfig = "?amqp.presettleConsumers=true"; + public void testPresettledTopicConsumersConfigurationAppliedToTempTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryTopic.class); } @Test(timeout = 20000) - public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueueNoRelaySupport() throws Exception { - String presettleConfig = "?amqp.presettleConsumers=true"; + public void testPresettledTopicConsumersConfigurationAppliedToTempQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTopicConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryQueue.class); + } + + //----- Test the jms.presettlePolicy.presettleQueueConsumers option ----- // + + @Test(timeout = 20000) + public void testPresettleQueueConsumersConfigurationAppliedToTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Topic.class); + } + + @Test(timeout = 20000) + public void testPresettledQueueConsumersConfigurationAppliedToQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, Queue.class); + } + + @Test(timeout = 20000) + public void testPresettledQueueConsumersConfigurationAppliedToTempTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryTopic.class); + } + + @Test(timeout = 20000) + public void testPresettledQueueConsumersConfigurationAppliedToTempQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleQueueConsumers=true"; doTestConsumerWithPresettleOptions(presettleConfig, false, true, true, TemporaryQueue.class); } + //----- Test the jms.presettlePolicy.presettleTransactedConsumers option -// + + @Test(timeout = 20000) + public void testPresettleTransactedConsumersConfigurationAppliedToTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, true, true, true, Topic.class); + } + + @Test(timeout = 20000) + public void testPresettledTransactedConsumersConfigurationAppliedToQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, true, true, true, Queue.class); + } + + @Test(timeout = 20000) + public void testPresettledTransactedConsumersConfigurationAppliedToTempTopic() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, true, true, true, TemporaryTopic.class); + } + + @Test(timeout = 20000) + public void testPresettledTransactedConsumersConfigurationAppliedToTempQueue() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, true, true, true, TemporaryQueue.class); + } + + @Test(timeout = 20000) + public void testPresettleTransactedConsumersConfigurationAppliedToTopicNoTX() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Topic.class); + } + + @Test(timeout = 20000) + public void testPresettledTransactedConsumersConfigurationAppliedToQueueNoTX() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, Queue.class); + } + + @Test(timeout = 20000) + public void testPresettledTransactedConsumersConfigurationAppliedToTempTopicNoTX() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryTopic.class); + } + + @Test(timeout = 20000) + public void testPresettledTransactedConsumersConfigurationAppliedToTempQueueNoTX() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleTransactedConsumers=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, false, false, TemporaryQueue.class); + } + + //----- Test the presettled consumer still settles if needed -------------// + + @Test(timeout = 20000) + public void testPresettledTopicConsumerSettlesWhenNeeded() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, Topic.class); + } + + @Test(timeout = 20000) + public void testPresettledQueueConsumerSettlesWhenNeeded() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, Queue.class); + } + + @Test(timeout = 20000) + public void testPresettledTempTopicConsumerSettlesWhenNeeded() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, TemporaryTopic.class); + } + + @Test(timeout = 20000) + public void testPresettledTempQueueConsumerSettlesWhenNeeded() throws Exception { + String presettleConfig = "?jms.presettlePolicy.presettleAll=true"; + doTestConsumerWithPresettleOptions(presettleConfig, false, true, false, TemporaryQueue.class); + } + //----- Test Method implementation ---------------------------------------// private void doTestConsumerWithPresettleOptions(String uriOptions, boolean transacted, boolean senderSettled, boolean transferSettled, Class<? extends Destination> destType) throws Exception { @@ -174,7 +280,10 @@ public class PresettledConsumerIntegrationTest extends QpidJmsTestCase { } // Send a settled transfer, client should not send any dispositions - testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, true); + testPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent, transferSettled); + if (!transferSettled) { + testPeer.expectDispositionThatIsAcceptedAndSettled(); + } MessageConsumer consumer = session.createConsumer(destination); assertNotNull(consumer.receive(100)); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
