Repository: qpid-jms Updated Branches: refs/heads/master 4915a8fdf -> bac662d54
https://issues.apache.org/jira/browse/QPIDJMS-131 Initial pass of addition of JmsPresettlePolicy to allow finer control over presettlement in the JMS client. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/bac662d5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/bac662d5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/bac662d5 Branch: refs/heads/master Commit: bac662d540ab51e418ef2b328c56507d2f9f13cb Parents: 4915a8f Author: Timothy Bish <[email protected]> Authored: Fri Apr 29 18:18:48 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Fri Apr 29 18:18:48 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 13 +- .../apache/qpid/jms/JmsConnectionFactory.java | 60 +- .../org/apache/qpid/jms/JmsMessageProducer.java | 17 +- .../org/apache/qpid/jms/JmsPresettlePolicy.java | 169 ++++++ .../apache/qpid/jms/JmsRedeliveryPolicy.java | 30 + .../java/org/apache/qpid/jms/JmsSession.java | 14 +- .../apache/qpid/jms/meta/JmsConnectionInfo.java | 24 + .../apache/qpid/jms/meta/JmsProducerInfo.java | 19 + .../amqp/AmqpAnonymousFallbackProducer.java | 6 +- .../qpid/jms/provider/amqp/AmqpConnection.java | 1 + .../jms/provider/amqp/AmqpFixedProducer.java | 9 +- .../amqp/builders/AmqpProducerBuilder.java | 10 +- .../PresettledProducerIntegrationTest.java | 563 +++++++++++++++++++ .../integration/ProducerIntegrationTest.java | 32 -- 14 files changed, 883 insertions(+), 84 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index fd2d5be..4374954 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -101,7 +101,6 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection private ExceptionListener exceptionListener; private JmsMessageFactory messageFactory; private Provider provider; - private JmsMessageIDBuilder messageIDBuilder; private final Set<JmsConnectionListener> connectionListeners = new CopyOnWriteArraySet<JmsConnectionListener>(); @@ -863,6 +862,14 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection connectionInfo.setRedeliveryPolicy(redeliveryPolicy); } + public JmsPresettlePolicy getPresettlePolicy() { + return connectionInfo.getPresettlePolicy(); + } + + public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) { + connectionInfo.setPresettlePolicy(presettlePolicy); + } + public boolean isReceiveLocalOnly() { return connectionInfo.isReceiveLocalOnly(); } @@ -1003,11 +1010,11 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection } public JmsMessageIDBuilder getMessageIDBuilder() { - return messageIDBuilder; + return connectionInfo.getMessageIDBuilder(); } void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) { - this.messageIDBuilder = messageIDBuilder; + connectionInfo.setMessageIDBuilder(messageIDBuilder); } public boolean isPopulateJMSXUserID() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java index 087f18d..9fb708d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java @@ -86,6 +86,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy(); private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy(); + private JmsPresettlePolicy presettlePolicy = new JmsPresettlePolicy(); private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder(); public JmsConnectionFactory() { @@ -290,39 +291,31 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact try { if (this.remoteURI.getQuery() != null) { Map<String, String> map = PropertyUtil.parseQuery(this.remoteURI.getQuery()); - Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(map, "jms."); - - Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap); - if (!unused.isEmpty()) { - String msg = "" - + " Not all jms options could be set on the ConnectionFactory." - + " Check the options are spelled correctly." - + " Unused parameters=[" + unused + "]." - + " This connection factory cannot be started."; - throw new IllegalArgumentException(msg); - } else { - this.remoteURI = PropertyUtil.replaceQuery(this.remoteURI, map); - } + applyURIOptions(map); + this.remoteURI = PropertyUtil.replaceQuery(this.remoteURI, map); } else if (URISupport.isCompositeURI(this.remoteURI)) { CompositeData data = URISupport.parseComposite(this.remoteURI); - Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(data.getParameters(), "jms."); - Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap); - if (!unused.isEmpty()) { - String msg = "" - + " Not all jms options could be set on the ConnectionFactory." - + " Check the options are spelled correctly." - + " Unused parameters=[" + unused + "]." - + " This connection factory cannot be started."; - throw new IllegalArgumentException(msg); - } else { - this.remoteURI = data.toURI(); - } + applyURIOptions(data.getParameters()); + this.remoteURI = data.toURI(); } } catch (Exception e) { throw new IllegalArgumentException(e.getMessage()); } } + private void applyURIOptions(Map<String, String> options) throws IllegalArgumentException { + Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(options, "jms."); + Map<String, String> unused = PropertyUtil.setProperties(this, jmsOptionsMap); + if (!unused.isEmpty()) { + String msg = "" + + " Not all jms options could be set on the ConnectionFactory." + + " Check the options are spelled correctly." + + " Unused parameters=[" + unused + "]." + + " This connection factory cannot be started."; + throw new IllegalArgumentException(msg); + } + } + /** * @return the user name used for connection authentication. */ @@ -524,6 +517,23 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact } /** + * @return the presettlePolicy that is currently configured. + */ + public JmsPresettlePolicy getPresettlePolicy() { + return presettlePolicy; + } + + /** + * Sets the JmsPresettlePolicy that is applied to MessageProducers. + * + * @param presettlePolicy + * the presettlePolicy to use by connections created from this factory. + */ + public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) { + this.presettlePolicy = presettlePolicy; + } + + /** * @return the currently configured client ID prefix for auto-generated client IDs. */ public synchronized String getClientIDPrefix() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java index 600f26e..84e4017 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java @@ -40,7 +40,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { protected final JmsSession session; protected final JmsConnection connection; protected JmsProducerInfo producerInfo; - protected final boolean flexibleDestination; + protected final boolean anonymousProducer; protected int deliveryMode = DeliveryMode.PERSISTENT; protected int priority = Message.DEFAULT_PRIORITY; protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE; @@ -53,9 +53,10 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { protected JmsMessageProducer(JmsProducerId producerId, JmsSession session, JmsDestination destination) throws JMSException { this.session = session; this.connection = session.getConnection(); - this.flexibleDestination = destination == null; + this.anonymousProducer = destination == null; this.producerInfo = new JmsProducerInfo(producerId); this.producerInfo.setDestination(destination); + this.producerInfo.setPresettle(session.getPresettlePolicy().isSendPresttled(destination, session)); session.getConnection().createResource(producerInfo); } @@ -141,7 +142,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { checkClosed(); - if (flexibleDestination) { + if (anonymousProducer) { throw new UnsupportedOperationException("Using this method is not supported on producers created without an explicit Destination"); } @@ -157,7 +158,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException { checkClosed(); - if (!flexibleDestination) { + if (!anonymousProducer) { throw new UnsupportedOperationException("Using this method is not supported on producers created with an explicit Destination."); } @@ -231,6 +232,14 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { } } + protected boolean isPresettled() { + return producerInfo.isPresettle(); + } + + protected boolean isAnonymous() { + return anonymousProducer; + } + //////////////////////////////////////////////////////////////////////////// // Connection interruption handlers. //////////////////////////////////////////////////////////////////////////// http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java new file mode 100644 index 0000000..c6079d6 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsPresettlePolicy.java @@ -0,0 +1,169 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms; + +/** + * Policy object that allows for configuration of options that affect when + * a JMS MessageProducer will result in AMQP presettled message sends. + */ +public class JmsPresettlePolicy { + + private boolean presettleAll; + private boolean presettleProducers; + private boolean presettleTopicProducers; + private boolean presettleQueueProducers; + private boolean presettleTransactedProducers; + + public JmsPresettlePolicy() { + } + + public JmsPresettlePolicy(JmsPresettlePolicy source) { + this.presettleAll = source.presettleAll; + this.presettleProducers = source.presettleProducers; + this.presettleTopicProducers = source.presettleTopicProducers; + this.presettleQueueProducers = source.presettleQueueProducers; + this.presettleTransactedProducers = source.presettleTransactedProducers; + } + + public JmsPresettlePolicy copy() { + return new JmsPresettlePolicy(this); + } + + /** + * @return the presettleAll setting for this policy + */ + public boolean isPresettleAll() { + return presettleAll; + } + + /** + * Sets the presettle all sends option. When true all MessageProducers + * will send their messages presettled. + * + * @param presettleAll + * the presettleAll value to apply. + */ + public void setPresettleAll(boolean presettleAll) { + this.presettleAll = presettleAll; + } + + /** + * @return the presettleProducers setting for this policy. + */ + public boolean isPresettleProducers() { + return presettleProducers; + } + + /** + * Sets the the presettle all sends option. When true all MessageProducers that + * are created will send their messages as settled. + * + * @param presettleProducers + * the presettleProducers value to apply. + */ + public void setPresettleProducers(boolean presettleProducers) { + this.presettleProducers = presettleProducers; + } + + /** + * @return the presettleTopicProducers setting for this policy + */ + public boolean isPresettleTopicProducers() { + return presettleTopicProducers; + } + + /** + * Sets the presettle Topic sends option. When true any MessageProducer that + * is created that sends to a Topic will send its messages presettled, and any + * anonymous MessageProducer will send Messages that are sent to a Topic as + * presettled as well. + * + * @param presettleTopicProducers + * the presettleTopicProducers value to apply. + */ + public void setPresettleTopicProducers(boolean presettleTopicProducers) { + this.presettleTopicProducers = presettleTopicProducers; + } + + /** + * @return the presettleQueueSends setting for this policy + */ + public boolean isPresettleQueueProducers() { + return presettleQueueProducers; + } + + /** + * Sets the presettle Queue sends option. When true any MessageProducer that + * is created that sends to a Queue will send its messages presettled, and any + * anonymous MessageProducer will send Messages that are sent to a Queue as + * presettled as well. + * + * @param presettleQueueProducers + * the presettleQueueSends value to apply. + */ + public void setPresettleQueueProducers(boolean presettleQueueProducers) { + this.presettleQueueProducers = presettleQueueProducers; + } + + /** + * @return the presettleTransactedSends setting for this policy + */ + public boolean isPresettleTransactedProducers() { + return presettleTransactedProducers; + } + + /** + * Sets the presettle in transactions option. When true any MessageProducer that is + * operating inside of a transacted session will send its messages presettled. + * + * @param presettleTransactedProducers the presettleTransactedSends to set + */ + public void setPresettleTransactedProducers(boolean presettleTransactedProducers) { + this.presettleTransactedProducers = presettleTransactedProducers; + } + + /** + * Determines when a producer will send message presettled. + * <p> + * Called when the a producer is being created to determine whether the producer will + * be configured to send all its message as presettled or not. + * <p> + * For an anonymous producer this method is called on each send to allow the policy to + * be applied to the target destination that the message will be sent to. + * + * @param destination + * the destination that the producer will be sending to. + * @param session + * the session that owns the producer that will send be sending a message. + * + * @return true if the producer should send presettled. + */ + public boolean isSendPresttled(JmsDestination destination, JmsSession session) { + + if (presettleAll || presettleProducers) { + return true; + } else if (session.isTransacted() && presettleTransactedProducers) { + return true; + } else if (destination != null && destination.isQueue() && presettleQueueProducers) { + return true; + } else if (destination != null && destination.isTopic() && presettleTopicProducers) { + return true; + } + + return false; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java index 395b982..c57e238 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsRedeliveryPolicy.java @@ -59,4 +59,34 @@ public class JmsRedeliveryPolicy { public void setMaxRedeliveries(int maxRedeliveries) { this.maxRedeliveries = maxRedeliveries; } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + maxRedeliveries; + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null) { + return false; + } + + if (getClass() != obj.getClass()) { + return false; + } + + JmsRedeliveryPolicy other = (JmsRedeliveryPolicy) obj; + if (maxRedeliveries != other.maxRedeliveries) { + return false; + } + + return true; + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index c00cdbd..a3e2b27 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -93,7 +93,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe private final AtomicBoolean started = new AtomicBoolean(); private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages = new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000); - private JmsPrefetchPolicy prefetchPolicy; + private final JmsPrefetchPolicy prefetchPolicy; + private final JmsPresettlePolicy presettlePolicy; private final JmsMessageIDBuilder messageIDBuilder; private final JmsSessionInfo sessionInfo; private volatile ExecutorService executor; @@ -108,7 +109,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException { this.connection = connection; this.acknowledgementMode = acknowledgementMode; - this.prefetchPolicy = new JmsPrefetchPolicy(connection.getPrefetchPolicy()); + this.prefetchPolicy = connection.getPrefetchPolicy().copy(); + this.presettlePolicy = connection.getPresettlePolicy().copy(); this.messageIDBuilder = connection.getMessageIDBuilder(); if (acknowledgementMode == SESSION_TRANSACTED) { @@ -696,6 +698,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe envelope.setSendAsync(!sync); envelope.setDispatchId(messageSequence); + if (producer.isAnonymous()) { + envelope.setPresettle(presettlePolicy.isSendPresttled(destination, this)); + } + transactionContext.send(connection, envelope); } finally { sendLock.unlock(); @@ -910,8 +916,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe return prefetchPolicy; } - public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) { - this.prefetchPolicy = prefetchPolicy; + public JmsPresettlePolicy getPresettlePolicy() { + return presettlePolicy; } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java index d97d33d..3af306d 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java @@ -20,7 +20,9 @@ import java.net.URI; import java.nio.charset.Charset; import org.apache.qpid.jms.JmsPrefetchPolicy; +import org.apache.qpid.jms.JmsPresettlePolicy; import org.apache.qpid.jms.JmsRedeliveryPolicy; +import org.apache.qpid.jms.message.JmsMessageIDBuilder; /** * Meta object that contains the JmsConnection identification and configuration @@ -59,6 +61,8 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy(); private JmsRedeliveryPolicy redeliveryPolicy = new JmsRedeliveryPolicy(); + private JmsPresettlePolicy presettlePolicy = new JmsPresettlePolicy(); + private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder(); private volatile byte[] encodedUserId; @@ -89,6 +93,10 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne copy.topicPrefix = topicPrefix; copy.connectTimeout = connectTimeout; copy.validatePropertyNames = validatePropertyNames; + copy.messageIDBuilder = messageIDBuilder; + copy.prefetchPolicy = prefetchPolicy.copy(); + copy.redeliveryPolicy = redeliveryPolicy.copy(); + copy.presettlePolicy = presettlePolicy.copy(); } public boolean isForceAsyncSend() { @@ -264,6 +272,22 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne this.redeliveryPolicy = redeliveryPolicy.copy(); } + public JmsPresettlePolicy getPresettlePolicy() { + return presettlePolicy; + } + + public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) { + this.presettlePolicy = presettlePolicy; + } + + public JmsMessageIDBuilder getMessageIDBuilder() { + return messageIDBuilder; + } + + public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) { + this.messageIDBuilder = messageIDBuilder; + } + public boolean isPopulateJMSXUserID() { return populateJMSXUserID; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java index 53939a9..8b1e019 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java @@ -22,6 +22,7 @@ public final class JmsProducerInfo implements JmsResource, Comparable<JmsProduce private final JmsProducerId producerId; private JmsDestination destination; + private boolean presettle; public JmsProducerInfo(JmsProducerId producerId) { if (producerId == null) { @@ -66,6 +67,24 @@ public final class JmsProducerInfo implements JmsResource, Comparable<JmsProduce this.destination = destination; } + /** + * @return the presettle mode of this producer. + */ + public boolean isPresettle() { + return presettle; + } + + /** + * Sets the presettle mode of the producer, when true the producer will be created + * as a presettled producer and all messages it sends will be settled before dispatch. + * + * @param presettle + * the presettle option to set on this producer. + */ + public void setPresettle(boolean presettle) { + this.presettle = presettle; + } + @Override public String toString() { return "JmsProducerInfo { " + getId() + ", destination = " + getDestination() + " }"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java index a115f72..71f26e7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpAnonymousFallbackProducer.java @@ -45,7 +45,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { private static final Logger LOG = LoggerFactory.getLogger(AmqpAnonymousFallbackProducer.class); private static final IdGenerator producerIdGenerator = new IdGenerator(); - private final AnonymousProducerCache producerCache = new AnonymousProducerCache(10); + private final AnonymousProducerCache producerCache; private final String producerIdKey = producerIdGenerator.generateId(); private long producerIdCount; @@ -61,7 +61,10 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { super(session, info); if (connection.isAnonymousProducerCache()) { + producerCache = new AnonymousProducerCache(10); producerCache.setMaxCacheSize(connection.getAnonymousProducerCacheSize()); + } else { + producerCache = null; } } @@ -79,6 +82,7 @@ public class AmqpAnonymousFallbackProducer extends AmqpProducer { // send to the given AMQP target. JmsProducerInfo info = new JmsProducerInfo(getNextProducerId()); info.setDestination(envelope.getDestination()); + info.setPresettle(this.getResourceInfo().isPresettle()); // We open a Fixed Producer instance with the target destination. Once it opens // it will trigger the open event which will in turn trigger the send event. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java index a7e818c..f284c8a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnection.java @@ -119,6 +119,7 @@ public class AmqpConnection extends AmqpAbstractResource<JmsConnectionInfo, Conn return remoteURI; } + @Override public AmqpProvider getProvider() { return provider; } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java index ef526c9..48e9d72 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java @@ -43,6 +43,7 @@ import org.apache.qpid.proton.amqp.messaging.Released; import org.apache.qpid.proton.amqp.transaction.TransactionalState; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.message.Message; @@ -63,7 +64,6 @@ public class AmqpFixedProducer extends AmqpProducer { private final Set<Delivery> sent = new LinkedHashSet<Delivery>(); private final LinkedList<InFlightSend> blocked = new LinkedList<InFlightSend>(); private byte[] encodeBuffer = new byte[1024 * 8]; - private boolean presettle = false; public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) { super(session, info); @@ -287,13 +287,8 @@ public class AmqpFixedProducer extends AmqpProducer { } @Override - public void setPresettle(boolean presettle) { - this.presettle = presettle; - } - - @Override public boolean isPresettle() { - return presettle; + return getEndpoint().getSenderSettleMode() == SenderSettleMode.SETTLED; } public long getSendTimeout() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java index 74fe18d..0e0f3f4 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/builders/AmqpProducerBuilder.java @@ -66,7 +66,7 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS Sender sender = getParent().getEndpoint().sender(senderName); sender.setSource(source); sender.setTarget(target); - if (getParent().getConnection().isPresettleProducers()) { + if (resourceInfo.isPresettle() || getParent().getConnection().isPresettleProducers()) { sender.setSenderSettleMode(SenderSettleMode.SETTLED); } else { sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); @@ -78,13 +78,7 @@ public class AmqpProducerBuilder extends AmqpResourceBuilder<AmqpProducer, AmqpS @Override protected AmqpProducer createResource(AmqpSession parent, JmsProducerInfo resourceInfo, Sender endpoint) { - AmqpProducer producer = new AmqpFixedProducer(getParent(), getResourceInfo(), endpoint); - - if (getParent().getConnection().isPresettleProducers()) { - producer.setPresettle(true); - } - - return producer; + return new AmqpFixedProducer(getParent(), getResourceInfo(), endpoint); } @Override http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java new file mode 100644 index 0000000..e7b70ef --- /dev/null +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/PresettledProducerIntegrationTest.java @@ -0,0 +1,563 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.integration; + +import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY; +import static org.hamcrest.Matchers.arrayContaining; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.nullValue; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageProducer; +import javax.jms.Session; + +import org.apache.qpid.jms.test.QpidJmsTestCase; +import org.apache.qpid.jms.test.testpeer.ListDescribedType; +import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; +import org.apache.qpid.jms.test.testpeer.describedtypes.Accepted; +import org.apache.qpid.jms.test.testpeer.describedtypes.TransactionalState; +import org.apache.qpid.jms.test.testpeer.matchers.CoordinatorMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.TransactionalStateMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageAnnotationsSectionMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.MessageHeaderSectionMatcher; +import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.transaction.TxnCapability; +import org.hamcrest.Matcher; +import org.junit.Test; + +/** + * Test MessageProducers created using various configuration of the presettle options + */ +public class PresettledProducerIntegrationTest extends QpidJmsTestCase { + + private final IntegrationTestFixture testFixture = new IntegrationTestFixture(); + + private final Symbol[] serverCapabilities = new Symbol[] { ANONYMOUS_RELAY }; + + //----- Test the jms.presettleAll option ---------------------------------// + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleAllSendToTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleAllSendToQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleAllSendToTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleAllSendToTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleAllAnonymousSendToTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleAllAnonymousSendToQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleAllAnonymousSendToTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleAllAnonymousSendToTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleAll=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true); + } + } + + //----- Test the amqp.presettleProducers option --------------------------// + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedToTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false); + } + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedToQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false); + } + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedToTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true); + } + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedToTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true); + } + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false); + } + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false); + } + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true); + } + } + + @Test(timeout = 20000) + public void testPresettledProducersConfigurationAppliedAnonymousSendToTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true); + } + } + + //----- Test the jms.presettleProducers option ---------------------------------// + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleProducersTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleProducersQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleProducersTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleProducersTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleProducersAnonymousTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleProducersAnonymousQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleProducersAnonymousTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleProducersAnonymousTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, true, true, false, true); + } + } + + //----- Test the jms.presettleTopicProducers option ---------------------------------// + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTopicProducersTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTopicProducersQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTopicProducersTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTopicProducersTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTopicProducersAnonymousQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTopicProducersAnonymousTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTopicProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, false, true); + } + } + + //----- Test the jms.presettleQueueProducers option ---------------------------------// + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleQueueProducersTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, false, false, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleQueueProducersQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleQueueProducersTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, false, false, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleQueueProducersTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, false, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleQueueProducersAnonymousQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, false, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleQueueProducersAnonymousTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleQueueProducers=true", serverCapabilities, null); + doTestProducerWithPresettleOptions(testPeer, connection, false, true, false, true, false, true); + } + } + + //----- Test the jms.presettleTransactedProducers option ---------------------------------// + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTransactedProducersTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTransactedProducersQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTransactedProducersTempTopic() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTransactedProducersTempQueue() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, true, true, true, false, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTransactedProducersTopicNoTX() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTransactedProducersQueueNoTX() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false, false); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTransactedProducersTempTopicNoTX() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, true, true); + } + } + + @Test(timeout = 20000) + public void testJmsPresettlePolicyPresettleTransactedProducersTempQueueNoTX() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + Connection connection = testFixture.establishConnecton(testPeer, "?jms.presettlePolicy.presettleTransactedProducers=true"); + doTestProducerWithPresettleOptions(testPeer, connection, false, false, false, false, true); + } + } + + //----- Test Method implementation ---------------------------------------// + + private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception { + doTestProducerWithPresettleOptions(testPeer, connection, false, senderSettled, transferSettled, topic, temporary); + } + + private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean transacted, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception { + doTestProducerWithPresettleOptions(testPeer, connection, transacted, false, senderSettled, transferSettled, topic, temporary); + } + + private void doTestProducerWithPresettleOptions(TestAmqpPeer testPeer, Connection connection, boolean transacted, boolean anonymous, boolean senderSettled, boolean transferSettled, boolean topic, boolean temporary) throws Exception { + testPeer.expectBegin(); + + Session session = null; + Binary txnId = null; + + if (transacted) { + // Expect the session, with an immediate link to the transaction coordinator + // using a target with the expected capabilities only. + CoordinatorMatcher txCoordinatorMatcher = new CoordinatorMatcher(); + txCoordinatorMatcher.withCapabilities(arrayContaining(TxnCapability.LOCAL_TXN)); + testPeer.expectSenderAttach(txCoordinatorMatcher, false, false); + + // First expect an unsettled 'declare' transfer to the txn coordinator, and + // reply with a declared disposition state containing the txnId. + txnId = new Binary(new byte[]{ (byte) 1, (byte) 2, (byte) 3, (byte) 4}); + testPeer.expectDeclare(txnId); + + session = connection.createSession(true, Session.SESSION_TRANSACTED); + } else { + session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + } + + Destination destination = null; + if (topic) { + if (temporary) { + String dynamicAddress = "myTempTopicAddress"; + testPeer.expectTempTopicCreationAttach(dynamicAddress); + destination = session.createTemporaryTopic(); + } else { + destination = session.createTopic("myTopic"); + } + } else { + if (temporary) { + String dynamicAddress = "myTempQueueAddress"; + testPeer.expectTempQueueCreationAttach(dynamicAddress); + destination = session.createTemporaryQueue(); + } else { + destination = session.createQueue("myTopic"); + } + destination = session.createQueue("myQueue"); + } + + if (senderSettled) { + testPeer.expectSettledSenderAttach(); + } else { + testPeer.expectSenderAttach(); + } + + MessageProducer producer = null; + if (anonymous) { + producer = session.createProducer(null); + } else { + producer = session.createProducer(destination); + } + + // Create and transfer a new message + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true); + headersMatcher.withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + + Matcher<?> stateMatcher = nullValue(); + if (transacted) { + stateMatcher = new TransactionalStateMatcher(); + ((TransactionalStateMatcher) stateMatcher).withTxnId(equalTo(txnId)); + ((TransactionalStateMatcher) stateMatcher).withOutcome(nullValue()); + } + + ListDescribedType responseState = new Accepted(); + if (transacted) { + TransactionalState txState = new TransactionalState(); + txState.setTxnId(txnId); + txState.setOutcome(new Accepted()); + } + + if (transferSettled) { + testPeer.expectTransfer(messageMatcher, stateMatcher, true, false, responseState, false); + } else { + testPeer.expectTransfer(messageMatcher, stateMatcher, false, true, responseState, true); + } + + Message message = session.createTextMessage(); + + if (anonymous) { + producer.send(destination, message); + } else { + producer.send(message); + } + + if (transacted) { + testPeer.expectDischarge(txnId, true); + } + + testPeer.expectClose(); + + connection.close(); + + testPeer.waitForAllHandlersToComplete(1000); + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/bac662d5/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 9625cd1..2e49080 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -1602,36 +1602,4 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { testPeer.waitForAllHandlersToComplete(1000); } } - - @Test(timeout = 20000) - public void testPresettledProducersConfigurationApplied() throws Exception { - try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - Connection connection = testFixture.establishConnecton(testPeer, "?amqp.presettleProducers=true"); - testPeer.expectBegin(); - testPeer.expectSettledSenderAttach(); - - Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); - Queue queue = session.createQueue("myQueue"); - MessageProducer producer = session.createProducer(queue); - - // Create and transfer a new message - MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true) - .withDurable(equalTo(true)); - MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); - TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); - messageMatcher.setHeadersMatcher(headersMatcher); - messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); - testPeer.expectTransfer(messageMatcher, nullValue(), true, false, null, false); - testPeer.expectClose(); - - Message message = session.createTextMessage(); - - producer.send(message); - assertEquals(DeliveryMode.PERSISTENT, message.getJMSDeliveryMode()); - - connection.close(); - - testPeer.waitForAllHandlersToComplete(1000); - } - } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
