Repository: qpid-jms Updated Branches: refs/heads/master 0251ae8ce -> 95bc1aa80
QPIDJMS-180 Add JmsMessageIDPolicy and a default implementation that preserve previous behavior. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/95bc1aa8 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/95bc1aa8 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/95bc1aa8 Branch: refs/heads/master Commit: 95bc1aa809b406c16e31802cf66c863cb0686a53 Parents: 0251ae8 Author: Timothy Bish <[email protected]> Authored: Tue May 24 14:15:11 2016 -0400 Committer: Timothy Bish <[email protected]> Committed: Tue May 24 14:15:11 2016 -0400 ---------------------------------------------------------------------- .../java/org/apache/qpid/jms/JmsConnection.java | 16 +- .../apache/qpid/jms/JmsConnectionFactory.java | 75 ++++- .../org/apache/qpid/jms/JmsMessageProducer.java | 11 +- .../java/org/apache/qpid/jms/JmsSession.java | 5 +- .../apache/qpid/jms/meta/JmsConnectionInfo.java | 15 +- .../apache/qpid/jms/meta/JmsProducerInfo.java | 24 +- .../jms/policy/JmsDefaultMessageIDPolicy.java | 81 +++++ .../jms/policy/JmsDefaultPresettlePolicy.java | 2 +- .../jms/policy/JmsDefaultRedeliveryPolicy.java | 2 +- .../qpid/jms/policy/JmsMessageIDPolicy.java | 48 +++ .../qpid/jms/JmsConnectionFactoryTest.java | 28 ++ .../ConnectionFactoryIntegrationTest.java | 335 +++++++++++++++++-- .../integration/ProducerIntegrationTest.java | 112 ++++++- .../qpid/jms/meta/JmsProducerInfoTest.java | 39 +-- 14 files changed, 703 insertions(+), 90 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java index 38aa2fc..7375c3f 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java @@ -54,7 +54,6 @@ import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessage; import org.apache.qpid.jms.message.JmsMessageFactory; -import org.apache.qpid.jms.message.JmsMessageIDBuilder; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsConnectionId; import org.apache.qpid.jms.meta.JmsConnectionInfo; @@ -67,6 +66,7 @@ import org.apache.qpid.jms.meta.JmsSessionId; import org.apache.qpid.jms.meta.JmsSessionInfo; import org.apache.qpid.jms.meta.JmsTransactionId; import org.apache.qpid.jms.meta.JmsTransactionInfo; +import org.apache.qpid.jms.policy.JmsMessageIDPolicy; import org.apache.qpid.jms.policy.JmsPrefetchPolicy; import org.apache.qpid.jms.policy.JmsPresettlePolicy; import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; @@ -123,8 +123,8 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection // not have it's own mechanism for doing so. executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() { @Override - public Thread newThread(Runnable r) { - Thread thread = new Thread(r, "QpidJMS Connection Executor: " + connectionId); + public Thread newThread(Runnable target) { + Thread thread = new Thread(target, "QpidJMS Connection Executor: " + connectionId); thread.setDaemon(false); return thread; } @@ -1012,12 +1012,12 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection connectionInfo.setLocalMessageExpiry(localMessageExpiry); } - public JmsMessageIDBuilder getMessageIDBuilder() { - return connectionInfo.getMessageIDBuilder(); + public JmsMessageIDPolicy getMessageIDPolicy() { + return connectionInfo.getMessageIDPolicy(); } - void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) { - connectionInfo.setMessageIDBuilder(messageIDBuilder); + public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) { + connectionInfo.setMessageIDPolicy(messageIDPolicy); } public boolean isPopulateJMSXUserID() { @@ -1154,7 +1154,7 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection onProviderException(ex); - for(AsyncResult request : requests.keySet()) { + for (AsyncResult request : requests.keySet()) { try { request.onFailure(ex); } catch (Exception e) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java index f874193..32cb39a 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java @@ -35,9 +35,11 @@ import org.apache.qpid.jms.exceptions.JmsExceptionSupport; import org.apache.qpid.jms.jndi.JNDIStorable; import org.apache.qpid.jms.message.JmsMessageIDBuilder; import org.apache.qpid.jms.meta.JmsConnectionInfo; +import org.apache.qpid.jms.policy.JmsDefaultMessageIDPolicy; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.policy.JmsDefaultPresettlePolicy; import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy; +import org.apache.qpid.jms.policy.JmsMessageIDPolicy; import org.apache.qpid.jms.policy.JmsPrefetchPolicy; import org.apache.qpid.jms.policy.JmsPresettlePolicy; import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; @@ -93,7 +95,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy(); private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy(); private JmsPresettlePolicy presettlePolicy = new JmsDefaultPresettlePolicy(); - private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder(); + private JmsMessageIDPolicy messageIDPolicy = new JmsDefaultMessageIDPolicy(); public JmsConnectionFactory() { } @@ -208,9 +210,15 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact properties.remove(CLIENT_ID_PROP); } + // Copy the configured policies before applying URI options that + // might make additional configuration changes. + connection.setMessageIDPolicy(messageIDPolicy.copy()); + connection.setPrefetchPolicy(prefetchPolicy.copy()); + connection.setPresettlePolicy(presettlePolicy.copy()); + connection.setRedeliveryPolicy(redeliveryPolicy.copy()); + PropertyUtil.setProperties(connection, properties); connection.setExceptionListener(exceptionListener); - connection.setMessageIDBuilder(messageIDBuilder); connection.setUsername(username); connection.setPassword(password); connection.setConfiguredURI(remoteURI); @@ -500,6 +508,10 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact } public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) { + if (prefetchPolicy == null) { + prefetchPolicy = new JmsDefaultPrefetchPolicy(); + } + this.prefetchPolicy = prefetchPolicy; } @@ -519,6 +531,9 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact * The new redeliveryPolicy to set */ public void setRedeliveryPolicy(JmsRedeliveryPolicy redeliveryPolicy) { + if (redeliveryPolicy == null) { + redeliveryPolicy = new JmsDefaultRedeliveryPolicy(); + } this.redeliveryPolicy = redeliveryPolicy; } @@ -536,10 +551,35 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact * the presettlePolicy to use by connections created from this factory. */ public void setPresettlePolicy(JmsPresettlePolicy presettlePolicy) { + if (presettlePolicy == null) { + presettlePolicy = new JmsDefaultPresettlePolicy(); + } this.presettlePolicy = presettlePolicy; } /** + * @return the messageIDPolicy that is currently configured. + */ + public JmsMessageIDPolicy getMessageIDPolicy() { + return messageIDPolicy; + } + + /** + * Sets the JmsMessageIDPolicy that is use to configure the JmsMessageIDBuilder + * that is assigned to any new MessageProducer created from Connection instances + * that this factory has created. + * + * @param messageIDPolicy + * the messageIDPolicy to use by connections created from this factory. + */ + public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) { + if (messageIDPolicy == null) { + messageIDPolicy = new JmsDefaultMessageIDPolicy(); + } + this.messageIDPolicy = messageIDPolicy; + } + + /** * @return the currently configured client ID prefix for auto-generated client IDs. */ public synchronized String getClientIDPrefix() { @@ -667,22 +707,39 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact /** * Sets the type of the Message IDs used to populate the outgoing Messages * + * @deprecated use the jms.messageIDPolicy.messageIDType URI setting instead. + * * @param type * The name of the Message type to use when sending a message. */ + @Deprecated public void setMessageIDType(String type) { - this.messageIDBuilder = JmsMessageIDBuilder.BUILTIN.create(type); + if (messageIDPolicy instanceof JmsDefaultMessageIDPolicy) { + ((JmsDefaultMessageIDPolicy) messageIDPolicy).setMessageIDType(type); + } } + @Deprecated public String getMessageIDType() { - return this.messageIDBuilder.toString(); + if (messageIDPolicy instanceof JmsDefaultMessageIDPolicy) { + return ((JmsDefaultMessageIDPolicy) this.messageIDPolicy).getMessageIDType(); + } + + return null; } /** * @return the messageIDBuilder currently configured. + * + * @deprecated Create a custom JmsMessageIDPolicy to control the JmsMessageIDBuilder */ + @Deprecated public JmsMessageIDBuilder getMessageIDBuilder() { - return messageIDBuilder; + if (messageIDPolicy instanceof JmsDefaultMessageIDPolicy) { + return ((JmsDefaultMessageIDPolicy) this.messageIDPolicy).getMessageIDBuilder(); + } + + return null; } /** @@ -692,12 +749,14 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact * * @param messageIDBuilder * The custom JmsMessageIDBuilder to use to create outgoing Message IDs. + * + * @deprecated Create a custom JmsMessageIDPolicy to control the JmsMessageIDBuilder */ + @Deprecated public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) { - if (messageIDBuilder == null) { - messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder(); + if (messageIDPolicy instanceof JmsDefaultMessageIDPolicy) { + ((JmsDefaultMessageIDPolicy) this.messageIDPolicy).setMessageIDBuilder(messageIDBuilder); } - this.messageIDBuilder = messageIDBuilder; } public boolean isReceiveLocalOnly() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java index c077230..89165e4 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java @@ -28,6 +28,7 @@ import javax.jms.JMSException; import javax.jms.Message; import javax.jms.MessageProducer; +import org.apache.qpid.jms.message.JmsMessageIDBuilder; import org.apache.qpid.jms.meta.JmsProducerId; import org.apache.qpid.jms.meta.JmsProducerInfo; import org.apache.qpid.jms.provider.Provider; @@ -55,7 +56,11 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { this.session = session; this.connection = session.getConnection(); this.anonymousProducer = destination == null; - this.producerInfo = new JmsProducerInfo(producerId); + + JmsMessageIDBuilder messageIDBuilder = + session.getConnection().getMessageIDPolicy().getMessageIDBuilder(session, destination); + + this.producerInfo = new JmsProducerInfo(producerId, messageIDBuilder); this.producerInfo.setDestination(destination); this.producerInfo.setPresettle(session.getPresettlePolicy().isProducerPresttled(session, destination)); @@ -241,6 +246,10 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer { return anonymousProducer; } + protected JmsMessageIDBuilder getMessageIDBuilder() { + return producerInfo.getMessageIDBuilder(); + } + void setFailureCause(Exception failureCause) { this.failureCause.set(failureCause); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java index 66d9539..b2f87ed 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java @@ -60,7 +60,6 @@ import javax.jms.TopicSubscriber; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessage; -import org.apache.qpid.jms.message.JmsMessageIDBuilder; import org.apache.qpid.jms.message.JmsMessageTransformation; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; import org.apache.qpid.jms.meta.JmsConsumerId; @@ -97,7 +96,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000); private final JmsPrefetchPolicy prefetchPolicy; private final JmsPresettlePolicy presettlePolicy; - private final JmsMessageIDBuilder messageIDBuilder; private final JmsSessionInfo sessionInfo; private volatile ExecutorService executor; private final ReentrantLock sendLock = new ReentrantLock(); @@ -113,7 +111,6 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe this.acknowledgementMode = acknowledgementMode; this.prefetchPolicy = connection.getPrefetchPolicy().copy(); this.presettlePolicy = connection.getPresettlePolicy().copy(); - this.messageIDBuilder = connection.getMessageIDBuilder(); if (acknowledgementMode == SESSION_TRANSACTED) { setTransactionContext(new JmsLocalTransactionContext(this)); @@ -664,7 +661,7 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe long messageSequence = producer.getNextMessageSequence(); Object messageId = null; if (!disableMsgId) { - messageId = messageIDBuilder.createMessageID(producer.getProducerId().toString(), messageSequence); + messageId = producer.getMessageIDBuilder().createMessageID(producer.getProducerId().toString(), messageSequence); } JmsMessage copy = null; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java index 14661ca..f6bf6a7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsConnectionInfo.java @@ -19,10 +19,11 @@ package org.apache.qpid.jms.meta; import java.net.URI; import java.nio.charset.Charset; -import org.apache.qpid.jms.message.JmsMessageIDBuilder; +import org.apache.qpid.jms.policy.JmsDefaultMessageIDPolicy; import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; import org.apache.qpid.jms.policy.JmsDefaultPresettlePolicy; import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy; +import org.apache.qpid.jms.policy.JmsMessageIDPolicy; import org.apache.qpid.jms.policy.JmsPrefetchPolicy; import org.apache.qpid.jms.policy.JmsPresettlePolicy; import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; @@ -65,7 +66,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne private JmsPrefetchPolicy prefetchPolicy = new JmsDefaultPrefetchPolicy(); private JmsRedeliveryPolicy redeliveryPolicy = new JmsDefaultRedeliveryPolicy(); private JmsPresettlePolicy presettlePolicy = new JmsDefaultPresettlePolicy(); - private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder(); + private JmsMessageIDPolicy messageIDPolicy = new JmsDefaultMessageIDPolicy(); private volatile byte[] encodedUserId; @@ -96,7 +97,7 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne copy.topicPrefix = topicPrefix; copy.connectTimeout = connectTimeout; copy.validatePropertyNames = validatePropertyNames; - copy.messageIDBuilder = messageIDBuilder; + copy.messageIDPolicy = messageIDPolicy; copy.prefetchPolicy = prefetchPolicy.copy(); copy.redeliveryPolicy = redeliveryPolicy.copy(); copy.presettlePolicy = presettlePolicy.copy(); @@ -283,12 +284,12 @@ public final class JmsConnectionInfo implements JmsResource, Comparable<JmsConne this.presettlePolicy = presettlePolicy; } - public JmsMessageIDBuilder getMessageIDBuilder() { - return messageIDBuilder; + public JmsMessageIDPolicy getMessageIDPolicy() { + return messageIDPolicy; } - public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) { - this.messageIDBuilder = messageIDBuilder; + public void setMessageIDPolicy(JmsMessageIDPolicy messageIDPolicy) { + this.messageIDPolicy = messageIDPolicy; } public boolean isPopulateJMSXUserID() { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java index 8b1e019..1b006d0 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/meta/JmsProducerInfo.java @@ -17,31 +17,35 @@ package org.apache.qpid.jms.meta; import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.message.JmsMessageIDBuilder; public final class JmsProducerInfo implements JmsResource, Comparable<JmsProducerInfo> { private final JmsProducerId producerId; + private final JmsMessageIDBuilder messageIDBuilder; + private JmsDestination destination; private boolean presettle; public JmsProducerInfo(JmsProducerId producerId) { + this(producerId, JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder()); + } + + public JmsProducerInfo(JmsProducerId producerId, JmsMessageIDBuilder messageIDBuilder) { if (producerId == null) { throw new IllegalArgumentException("Producer ID cannot be null"); } - this.producerId = producerId; - } - - public JmsProducerInfo(JmsSessionInfo sessionInfo, long producerId) { - if (sessionInfo == null) { - throw new IllegalArgumentException("Parent Session Info object cannot be null"); + if (messageIDBuilder == null) { + throw new IllegalArgumentException("Message ID Builder cannot be null"); } - this.producerId = new JmsProducerId(sessionInfo.getId(), producerId); + this.producerId = producerId; + this.messageIDBuilder = messageIDBuilder; } public JmsProducerInfo copy() { - JmsProducerInfo info = new JmsProducerInfo(producerId); + JmsProducerInfo info = new JmsProducerInfo(producerId, messageIDBuilder); copy(info); return info; } @@ -85,6 +89,10 @@ public final class JmsProducerInfo implements JmsResource, Comparable<JmsProduce this.presettle = presettle; } + public JmsMessageIDBuilder getMessageIDBuilder() { + return messageIDBuilder; + } + @Override public String toString() { return "JmsProducerInfo { " + getId() + ", destination = " + getDestination() + " }"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultMessageIDPolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultMessageIDPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultMessageIDPolicy.java new file mode 100644 index 0000000..2051561 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultMessageIDPolicy.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.policy; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.JmsSession; +import org.apache.qpid.jms.message.JmsMessageIDBuilder; + +/** + * The default MessageID policy used for all MessageProducers created from the + * client's connection factory. + */ +public class JmsDefaultMessageIDPolicy implements JmsMessageIDPolicy { + + private JmsMessageIDBuilder messageIDBuilder = JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder(); + + /** + * Initialize default Message ID builder policy + */ + public JmsDefaultMessageIDPolicy() { + } + + /** + * Creates a new JmsDefaultMessageIDPolicy instance copied from the source policy. + * + * @param source + * The policy instance to copy values from. + */ + public JmsDefaultMessageIDPolicy(JmsDefaultMessageIDPolicy source) { + this.messageIDBuilder = source.messageIDBuilder; + } + + @Override + public JmsDefaultMessageIDPolicy copy() { + return new JmsDefaultMessageIDPolicy(this); + } + + @Override + public JmsMessageIDBuilder getMessageIDBuilder(JmsSession session, JmsDestination destination) { + return messageIDBuilder; + } + + /** + * Sets the type of the Message IDs used to populate the outgoing Messages + * + * @param type + * The name of the Message type to use when sending a message. + */ + public void setMessageIDType(String type) { + this.messageIDBuilder = JmsMessageIDBuilder.BUILTIN.create(type); + } + + /** + * @return the type name of the configured JmsMessageIDBuilder. + */ + public String getMessageIDType() { + return this.messageIDBuilder.toString(); + } + + public JmsMessageIDBuilder getMessageIDBuilder() { + return messageIDBuilder; + } + + public void setMessageIDBuilder(JmsMessageIDBuilder messageIDBuilder) { + this.messageIDBuilder = messageIDBuilder; + } +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java index 8560750..2dc9f60 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultPresettlePolicy.java @@ -51,7 +51,7 @@ public class JmsDefaultPresettlePolicy implements JmsPresettlePolicy { } @Override - public JmsPresettlePolicy copy() { + public JmsDefaultPresettlePolicy copy() { return new JmsDefaultPresettlePolicy(this); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java index bf885ea..e9869b3 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsDefaultRedeliveryPolicy.java @@ -36,7 +36,7 @@ public class JmsDefaultRedeliveryPolicy implements JmsRedeliveryPolicy { } @Override - public JmsRedeliveryPolicy copy() { + public JmsDefaultRedeliveryPolicy copy() { return new JmsDefaultRedeliveryPolicy(this); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsMessageIDPolicy.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsMessageIDPolicy.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsMessageIDPolicy.java new file mode 100644 index 0000000..e519fc0 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/policy/JmsMessageIDPolicy.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.qpid.jms.policy; + +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.JmsSession; +import org.apache.qpid.jms.message.JmsMessageIDBuilder; + +/** + * Interface for a policy that controls what kind of MessageID type is used for Messages + * sent to a specific destination. + */ +public interface JmsMessageIDPolicy { + + /** + * Copy this policy into a newly allocated instance. + * + * @return a new JmsMessageIDPolicy that is a copy of this one. + */ + JmsMessageIDPolicy copy(); + + /** + * Returns the JmsMessageIDBuilder that should be used with the producer being created. + * + * @param session + * the Session that own the MessageProducer being created. + * @param destination + * the Destination that the consumer will be subscribed to. + * + * @return the JmsMessageIDBuilder instance that is assigned to the new producer. + */ + JmsMessageIDBuilder getMessageIDBuilder(JmsSession session, JmsDestination destination); + +} http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java index e5083cf..78e3164 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/JmsConnectionFactoryTest.java @@ -370,6 +370,34 @@ public class JmsConnectionFactoryTest extends QpidJmsTestCase { } @Test + public void testSetRemoteURIThrowsOnNullURI() throws Exception { + JmsConnectionFactory cf = new JmsConnectionFactory(); + try { + cf.setRemoteURI(null); + fail("Should not allow a null URI to be set."); + } catch (IllegalArgumentException e) { + } + } + + @Test + public void testCreateWithNullURIRemoteURIThrows() throws Exception { + try { + new JmsConnectionFactory("user", "pass", (URI) null); + fail("Should not allow a null URI to be set."); + } catch (NullPointerException e) { + } + } + + @Test + public void testCreateWithNullURIStringRemoteURIThrows() throws Exception { + try { + new JmsConnectionFactory("user", "pass", (String) null); + fail("Should not allow a null URI to be set."); + } catch (IllegalArgumentException e) { + } + } + + @Test public void testSerializeTwoConnectionFactories() throws Exception { String uri = "amqp://localhost:1234"; http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java index 016f188..cccb05e 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ConnectionFactoryIntegrationTest.java @@ -22,6 +22,7 @@ package org.apache.qpid.jms.integration; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -29,11 +30,23 @@ import java.net.URI; import java.util.UUID; import javax.jms.Connection; +import javax.jms.QueueConnection; +import javax.jms.TopicConnection; import org.apache.qpid.jms.JmsConnection; import org.apache.qpid.jms.JmsConnectionFactory; +import org.apache.qpid.jms.JmsDestination; +import org.apache.qpid.jms.JmsSession; import org.apache.qpid.jms.message.JmsMessageIDBuilder; import org.apache.qpid.jms.message.JmsMessageIDBuilder.BUILTIN; +import org.apache.qpid.jms.policy.JmsDefaultMessageIDPolicy; +import org.apache.qpid.jms.policy.JmsDefaultPrefetchPolicy; +import org.apache.qpid.jms.policy.JmsDefaultPresettlePolicy; +import org.apache.qpid.jms.policy.JmsDefaultRedeliveryPolicy; +import org.apache.qpid.jms.policy.JmsMessageIDPolicy; +import org.apache.qpid.jms.policy.JmsPrefetchPolicy; +import org.apache.qpid.jms.policy.JmsPresettlePolicy; +import org.apache.qpid.jms.policy.JmsRedeliveryPolicy; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.junit.Test; @@ -44,19 +57,6 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase { private static final Logger LOG = LoggerFactory.getLogger(ConnectionFactoryIntegrationTest.class); - private final class TestJmsMessageIdBuilder implements JmsMessageIDBuilder { - - @Override - public Object createMessageID(String producerId, long messageSequence) { - return UUID.randomUUID(); - } - - @Override - public String toString() { - return "TEST"; - } - } - @Test(timeout=20000) public void testCreateConnectionGoodProviderURI() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { @@ -80,6 +80,28 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase { } @Test(timeout=20000) + public void testTopicCreateConnectionGoodProviderString() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // DONT create a test fixture, we will drive everything directly. + JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:" + testPeer.getServerPort()); + TopicConnection connection = factory.createTopicConnection(); + assertNotNull(connection); + connection.close(); + } + } + + @Test(timeout=20000) + public void testCreateQueueConnectionGoodProviderString() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // DONT create a test fixture, we will drive everything directly. + JmsConnectionFactory factory = new JmsConnectionFactory("amqp://127.0.0.1:" + testPeer.getServerPort()); + QueueConnection connection = factory.createQueueConnection(); + assertNotNull(connection); + connection.close(); + } + } + + @Test(timeout=20000) public void testUriOptionsAppliedToConnection() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { // DONT create a test fixture, we will drive everything directly. @@ -114,7 +136,7 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase { public void testSetInvalidMessageIDFormatOption() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { // DONT create a test fixture, we will drive everything directly. - String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=UNKNOWN"; + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UNKNOWN"; try { new JmsConnectionFactory(uri); fail("Should not be able to create a factory with invalid id type option value."); @@ -124,14 +146,16 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase { } } + // TODO - Remove once the deprecated methods are removed. @Test(timeout=20000) - public void testSetMessageIDFormatOptionAlteredCase() throws Exception { + public void testSetMessageIDFormatOptionAlteredCaseLegacy() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { // DONT create a test fixture, we will drive everything directly. try { String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=uuid"; JmsConnectionFactory factory = new JmsConnectionFactory(uri); - assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), factory.getMessageIDType()); + JmsDefaultMessageIDPolicy policy = (JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy(); + assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), policy.getMessageIDType()); } catch (Exception ex) { fail("Should have succeeded in creating factory"); } @@ -139,15 +163,17 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase { try { String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=Uuid"; JmsConnectionFactory factory = new JmsConnectionFactory(uri); - assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), factory.getMessageIDType()); + JmsDefaultMessageIDPolicy policy = (JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy(); + assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), policy.getMessageIDType()); } catch (Exception ex) { fail("Should have succeeded in creating factory"); } } } + // TODO - Remove once the deprecated methods are removed. @Test(timeout=20000) - public void testMessageIDFormatOptionApplied() throws Exception { + public void testMessageIDFormatOptionAppliedLegacy() throws Exception { BUILTIN[] formatters = JmsMessageIDBuilder.BUILTIN.values(); for (BUILTIN formatter : formatters) { @@ -156,18 +182,63 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase { // DONT create a test fixture, we will drive everything directly. String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=" + formatter.name(); JmsConnectionFactory factory = new JmsConnectionFactory(uri); - assertEquals(formatter.name(), factory.getMessageIDType()); + assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).getMessageIDType()); JmsConnection connection = (JmsConnection) factory.createConnection(); - assertEquals(formatter.name(), connection.getMessageIDBuilder().toString()); + assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString()); connection.close(); } } } @Test(timeout=20000) - public void testSetCustomMessageIDBuilder() throws Exception { - TestJmsMessageIdBuilder custom = new TestJmsMessageIdBuilder(); + public void testSetMessageIDFormatOptionAlteredCase() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // DONT create a test fixture, we will drive everything directly. + try { + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=uuid"; + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + JmsDefaultMessageIDPolicy policy = (JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy(); + assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), policy.getMessageIDType()); + } catch (Exception ex) { + fail("Should have succeeded in creating factory"); + } + + try { + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=Uuid"; + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + JmsDefaultMessageIDPolicy policy = (JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy(); + assertEquals(JmsMessageIDBuilder.BUILTIN.UUID.name(), policy.getMessageIDType()); + } catch (Exception ex) { + fail("Should have succeeded in creating factory"); + } + } + } + + @Test(timeout=20000) + public void testMessageIDFormatOptionApplied() throws Exception { + BUILTIN[] formatters = JmsMessageIDBuilder.BUILTIN.values(); + + for (BUILTIN formatter : formatters) { + LOG.info("Testing application of Message ID Format: {}", formatter.name()); + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // DONT create a test fixture, we will drive everything directly. + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=" + formatter.name(); + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).getMessageIDType()); + + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertEquals(formatter.name(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString()); + connection.close(); + } + } + } + + // TODO - Remove once the deprecated methods are removed. + @SuppressWarnings("deprecation") + @Test(timeout=20000) + public void testSetCustomMessageIDBuilderLegacy() throws Exception { + CustomJmsMessageIdBuilder custom = new CustomJmsMessageIdBuilder(); try (TestAmqpPeer testPeer = new TestAmqpPeer();) { // DONT create a test fixture, we will drive everything directly. @@ -178,8 +249,226 @@ public class ConnectionFactoryIntegrationTest extends QpidJmsTestCase { assertEquals(custom.toString(), factory.getMessageIDType()); JmsConnection connection = (JmsConnection) factory.createConnection(); - assertEquals(custom.toString(), connection.getMessageIDBuilder().toString()); + assertEquals(custom.toString(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString()); connection.close(); } } + + @Test(timeout=20000) + public void testSetCustomMessageIDBuilder() throws Exception { + CustomJmsMessageIdBuilder custom = new CustomJmsMessageIdBuilder(); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // DONT create a test fixture, we will drive everything directly. + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort(); + + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).setMessageIDBuilder(custom); + assertEquals(custom.toString(), ((JmsDefaultMessageIDPolicy) factory.getMessageIDPolicy()).getMessageIDType()); + + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertEquals(custom.toString(), ((JmsDefaultMessageIDPolicy) connection.getMessageIDPolicy()).getMessageIDBuilder().toString()); + connection.close(); + } + } + + @Test(timeout=20000) + public void testSetCustomMessageIDPolicy() throws Exception { + CustomJmsMessageIDPolicy custom = new CustomJmsMessageIDPolicy(); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort(); + + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + factory.setMessageIDPolicy(custom); + assertEquals(custom, factory.getMessageIDPolicy()); + + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertTrue(connection.getMessageIDPolicy() instanceof CustomJmsMessageIDPolicy); + assertNotSame(custom, connection.getMessageIDPolicy()); + connection.close(); + } + } + + @Test(timeout=20000) + public void testSetCustomPrefetchPolicy() throws Exception { + CustomJmsPrefetchPolicy custom = new CustomJmsPrefetchPolicy(); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort(); + + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + factory.setPrefetchPolicy(custom); + assertEquals(custom, factory.getPrefetchPolicy()); + + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertTrue(connection.getPrefetchPolicy() instanceof CustomJmsPrefetchPolicy); + assertNotSame(custom, connection.getPrefetchPolicy()); + connection.close(); + } + } + + @Test(timeout=20000) + public void testSetCustomPresettlePolicy() throws Exception { + CustomJmsPresettlePolicy custom = new CustomJmsPresettlePolicy(); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort(); + + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + factory.setPresettlePolicy(custom); + assertEquals(custom, factory.getPresettlePolicy()); + + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertTrue(connection.getPresettlePolicy() instanceof CustomJmsPresettlePolicy); + assertNotSame(custom, connection.getPresettlePolicy()); + connection.close(); + } + } + + @Test(timeout=20000) + public void testSetCustomRedeliveryPolicy() throws Exception { + CustomJmsRedeliveryPolicy custom = new CustomJmsRedeliveryPolicy(); + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort(); + + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + factory.setRedeliveryPolicy(custom); + assertEquals(custom, factory.getRedeliveryPolicy()); + + JmsConnection connection = (JmsConnection) factory.createConnection(); + assertTrue(connection.getRedeliveryPolicy() instanceof CustomJmsRedeliveryPolicy); + assertNotSame(custom, connection.getRedeliveryPolicy()); + connection.close(); + } + } + + @Test(timeout=10000) + public void testMessageIDPolicyCannotBeNulled() throws Exception { + CustomJmsMessageIDPolicy custom = new CustomJmsMessageIDPolicy(); + + JmsConnectionFactory factory = new JmsConnectionFactory(); + assertTrue(factory.getMessageIDPolicy() instanceof JmsDefaultMessageIDPolicy); + + factory.setMessageIDPolicy(custom); + assertTrue(factory.getMessageIDPolicy() instanceof CustomJmsMessageIDPolicy); + + factory.setMessageIDPolicy(null); + assertTrue(factory.getMessageIDPolicy() instanceof JmsDefaultMessageIDPolicy); + } + + @Test(timeout=10000) + public void testPrefetchPolicyCannotBeNulled() throws Exception { + CustomJmsPrefetchPolicy custom = new CustomJmsPrefetchPolicy(); + + JmsConnectionFactory factory = new JmsConnectionFactory(); + assertTrue(factory.getPrefetchPolicy() instanceof JmsDefaultPrefetchPolicy); + + factory.setPrefetchPolicy(custom); + assertTrue(factory.getPrefetchPolicy() instanceof CustomJmsPrefetchPolicy); + + factory.setPrefetchPolicy(null); + assertTrue(factory.getPrefetchPolicy() instanceof JmsDefaultPrefetchPolicy); + } + + @Test(timeout=10000) + public void testPresettlePolicyCannotBeNulled() throws Exception { + CustomJmsPresettlePolicy custom = new CustomJmsPresettlePolicy(); + + JmsConnectionFactory factory = new JmsConnectionFactory(); + assertTrue(factory.getPresettlePolicy() instanceof JmsDefaultPresettlePolicy); + + factory.setPresettlePolicy(custom); + assertTrue(factory.getPresettlePolicy() instanceof CustomJmsPresettlePolicy); + + factory.setPresettlePolicy(null); + assertTrue(factory.getPresettlePolicy() instanceof JmsDefaultPresettlePolicy); + } + + @Test(timeout=10000) + public void testRedeliveryPolicyCannotBeNulled() throws Exception { + CustomJmsRedeliveryPolicy custom = new CustomJmsRedeliveryPolicy(); + + JmsConnectionFactory factory = new JmsConnectionFactory(); + assertTrue(factory.getRedeliveryPolicy() instanceof JmsDefaultRedeliveryPolicy); + + factory.setRedeliveryPolicy(custom); + assertTrue(factory.getRedeliveryPolicy() instanceof CustomJmsRedeliveryPolicy); + + factory.setRedeliveryPolicy(null); + assertTrue(factory.getRedeliveryPolicy() instanceof JmsDefaultRedeliveryPolicy); + } + + //----- Custom Policy Objects --------------------------------------------// + + private final class CustomJmsMessageIdBuilder implements JmsMessageIDBuilder { + + @Override + public Object createMessageID(String producerId, long messageSequence) { + return UUID.randomUUID(); + } + + @Override + public String toString() { + return "TEST"; + } + } + + private class CustomJmsMessageIDPolicy implements JmsMessageIDPolicy { + + @Override + public JmsMessageIDPolicy copy() { + return new CustomJmsMessageIDPolicy(); + } + + @Override + public JmsMessageIDBuilder getMessageIDBuilder(JmsSession session, JmsDestination destination) { + return JmsMessageIDBuilder.BUILTIN.UUID_STRING.createBuilder(); + } + } + + private class CustomJmsPrefetchPolicy implements JmsPrefetchPolicy { + + @Override + public JmsPrefetchPolicy copy() { + return new CustomJmsPrefetchPolicy(); + } + + @Override + public int getConfiguredPrefetch(JmsSession session, JmsDestination destination, boolean durable, boolean browser) { + return JmsDefaultPrefetchPolicy.DEFAULT_QUEUE_PREFETCH; + } + } + + private class CustomJmsPresettlePolicy implements JmsPresettlePolicy { + + @Override + public JmsPresettlePolicy copy() { + return new CustomJmsPresettlePolicy(); + } + + @Override + public boolean isProducerPresttled(JmsSession session, JmsDestination destination) { + return false; + } + + @Override + public boolean isConsumerPresttled(JmsSession session, JmsDestination destination) { + return false; + } + } + + private class CustomJmsRedeliveryPolicy implements JmsRedeliveryPolicy { + + @Override + public JmsRedeliveryPolicy copy() { + return new CustomJmsRedeliveryPolicy(); + } + + @Override + public int getMaxRedeliveries(JmsDestination destination) { + return JmsDefaultRedeliveryPolicy.DEFAULT_MAX_REDELIVERIES; + } + } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java index 485e5d2..8a03b87 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java @@ -618,8 +618,9 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } } + // TODO - Remove when the deprecated methods are removed. @Test(timeout=20000) - public void testSendingMessageWithUUIDStringMessageFormat() throws Exception { + public void testSendingMessageWithUUIDStringMessageFormatLegacy() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { // DONT create a test fixture, we will drive everything directly. String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=UUID_STRING"; @@ -672,7 +673,61 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } @Test(timeout=20000) - public void testSendingMessageWithUUIDMessageFormat() throws Exception { + public void testSendingMessageWithUUIDStringMessageFormat() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // DONT create a test fixture, we will drive everything directly. + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID_STRING"; + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + + Connection connection = factory.createConnection(); + testPeer.expectSaslAnonymousConnect(); + testPeer.expectBegin(); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + + String text = "myMessage"; + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(String.class)); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); + testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); + + Message message = session.createTextMessage(text); + + assertNull("JMSMessageID should not yet be set", message.getJMSMessageID()); + + producer.send(message); + + String jmsMessageID = message.getJMSMessageID(); + assertNotNull("JMSMessageID should be set", jmsMessageID); + assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:")); + + connection.close(); + + // Get the value that was actually transmitted/received, verify it is a String, compare to what we have locally + testPeer.waitForAllHandlersToComplete(1000); + + Object receivedMessageId = propsMatcher.getReceivedMessageId(); + + assertTrue("Expected UUID message id to be sent", receivedMessageId instanceof String); + assertTrue("Expected JMSMessageId value to be present in AMQP message", jmsMessageID.endsWith(receivedMessageId.toString())); + } + } + + // TODO - Remove when the deprecated methods are removed. + @Test(timeout=20000) + public void testSendingMessageWithUUIDMessageFormatLegacy() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { // DONT create a test fixture, we will drive everything directly. String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDType=UUID"; @@ -724,6 +779,59 @@ public class ProducerIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout=20000) + public void testSendingMessageWithUUIDMessageFormat() throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + // DONT create a test fixture, we will drive everything directly. + String uri = "amqp://127.0.0.1:" + testPeer.getServerPort() + "?jms.messageIDPolicy.messageIDType=UUID"; + JmsConnectionFactory factory = new JmsConnectionFactory(uri); + + Connection connection = factory.createConnection(); + testPeer.expectSaslAnonymousConnect(); + testPeer.expectBegin(); + + testPeer.expectBegin(); + testPeer.expectSenderAttach(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + String queueName = "myQueue"; + Queue queue = session.createQueue(queueName); + MessageProducer producer = session.createProducer(queue); + + String text = "myMessage"; + MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true)); + MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true); + MessagePropertiesSectionMatcher propsMatcher = new MessagePropertiesSectionMatcher(true).withMessageId(isA(UUID.class)); + TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher(); + messageMatcher.setHeadersMatcher(headersMatcher); + messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher); + messageMatcher.setPropertiesMatcher(propsMatcher); + messageMatcher.setMessageContentMatcher(new EncodedAmqpValueMatcher(text)); + testPeer.expectTransfer(messageMatcher); + testPeer.expectClose(); + + Message message = session.createTextMessage(text); + + assertNull("JMSMessageID should not yet be set", message.getJMSMessageID()); + + producer.send(message); + + String jmsMessageID = message.getJMSMessageID(); + assertNotNull("JMSMessageID should be set", jmsMessageID); + assertTrue("JMS 'ID:' prefix not found", jmsMessageID.startsWith("ID:")); + + connection.close(); + + // Get the value that was actually transmitted/received, verify it is a UUID, compare to what we have locally + testPeer.waitForAllHandlersToComplete(1000); + + Object receivedMessageId = propsMatcher.getReceivedMessageId(); + + assertTrue("Expected UUID message id to be sent", receivedMessageId instanceof UUID); + assertTrue("Expected JMSMessageId value to be present in AMQP message", jmsMessageID.endsWith(receivedMessageId.toString())); + } + } + /** * Test that after sending a message with the disableMessageID hint set, the message * object has a null JMSMessageID value, and no message-id field value was set. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/95bc1aa8/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java index e83e04b..f2371d0 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/meta/JmsProducerInfoTest.java @@ -25,13 +25,11 @@ import static org.junit.Assert.assertTrue; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.qpid.jms.JmsTopic; +import org.apache.qpid.jms.message.JmsMessageIDBuilder; import org.apache.qpid.jms.util.IdGenerator; import org.junit.Before; import org.junit.Test; -/** - * - */ public class JmsProducerInfoTest { private JmsProducerId firstId; @@ -53,33 +51,21 @@ public class JmsProducerInfoTest { secondId = new JmsProducerId(secondSessionId, 2); } - @Test(expected=IllegalArgumentException.class) - public void testExceptionWhenCreatedWithNullConnectionId() { - new JmsProducerInfo(null); - } - - @Test(expected=IllegalArgumentException.class) - public void testExceptionWhenCreatedWithNullSessionInfo() { - new JmsProducerInfo(null, 1); + private JmsProducerInfo createPorducerInfo(JmsProducerId producerId) { + return new JmsProducerInfo(producerId, JmsMessageIDBuilder.BUILTIN.DEFAULT.createBuilder()); } @Test public void testCreateFromProducerId() { - JmsProducerInfo info = new JmsProducerInfo(firstId); + JmsProducerInfo info = createPorducerInfo(firstId); assertSame(firstId, info.getId()); assertSame(firstId.getParentId(), info.getParentId()); assertNotNull(info.toString()); } @Test - public void testCreateFromSessionId() { - JmsProducerInfo info = new JmsProducerInfo(new JmsSessionInfo(firstSessionId), 1); - assertNotNull(info.toString()); - } - - @Test public void testCopy() { - JmsProducerInfo info = new JmsProducerInfo(firstId); + JmsProducerInfo info = createPorducerInfo(firstId); info.setDestination(new JmsTopic("Test")); JmsProducerInfo copy = info.copy(); @@ -90,8 +76,8 @@ public class JmsProducerInfoTest { @Test public void testCompareTo() { - JmsProducerInfo first = new JmsProducerInfo(firstId); - JmsProducerInfo second = new JmsProducerInfo(secondId); + JmsProducerInfo first = createPorducerInfo(firstId); + JmsProducerInfo second = createPorducerInfo(secondId); assertEquals(-1, first.compareTo(second)); assertEquals(0, first.compareTo(first)); @@ -100,8 +86,8 @@ public class JmsProducerInfoTest { @Test public void testHashCode() { - JmsProducerInfo first = new JmsProducerInfo(firstId); - JmsProducerInfo second = new JmsProducerInfo(secondId); + JmsProducerInfo first = createPorducerInfo(firstId); + JmsProducerInfo second = createPorducerInfo(secondId); assertEquals(first.hashCode(), first.hashCode()); assertEquals(second.hashCode(), second.hashCode()); @@ -111,8 +97,8 @@ public class JmsProducerInfoTest { @Test public void testEqualsCode() { - JmsProducerInfo first = new JmsProducerInfo(firstId); - JmsProducerInfo second = new JmsProducerInfo(secondId); + JmsProducerInfo first = createPorducerInfo(firstId); + JmsProducerInfo second = createPorducerInfo(secondId); assertEquals(first, first); assertEquals(second, second); @@ -126,8 +112,7 @@ public class JmsProducerInfoTest { @Test public void testVisit() throws Exception { - final JmsProducerInfo first = new JmsProducerInfo(firstId); - + final JmsProducerInfo first = createPorducerInfo(firstId); final AtomicBoolean visited = new AtomicBoolean(); first.visit(new JmsDefaultResourceVisitor() { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
