Author: jstrachan
Date: Fri Apr 4 08:05:37 2008
New Revision: 644735
URL: http://svn.apache.org/viewvc?rev=644735&view=rev
Log:
patched the JMS configuration so that setting the persistence, priority or
timeToLive will default the explicitQoS flag
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Modified:
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
URL:
http://svn.apache.org/viewvc/activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java?rev=644735&r1=644734&r2=644735&view=diff
==============================================================================
---
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
(original)
+++
activemq/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
Fri Apr 4 08:05:37 2008
@@ -81,7 +81,7 @@
private int maxConcurrentConsumers = 1;
// JmsTemplate only
private boolean useVersion102;
- private boolean explicitQosEnabled;
+ private Boolean explicitQosEnabled;
private boolean deliveryPersistent = true;
private long timeToLive = -1;
private MessageConverter messageConverter;
@@ -199,7 +199,7 @@
}
template.setDefaultDestinationName(destination);
- template.setExplicitQosEnabled(explicitQosEnabled);
+ template.setExplicitQosEnabled(isExplicitQosEnabled());
template.setDeliveryPersistent(deliveryPersistent);
if (messageConverter != null) {
template.setMessageConverter(messageConverter);
@@ -239,123 +239,6 @@
return container;
}
- protected void
configureMessageListenerContainer(AbstractMessageListenerContainer container,
- JmsEndpoint endpoint) {
- container.setConnectionFactory(getListenerConnectionFactory());
- if (destinationResolver != null) {
- container.setDestinationResolver(destinationResolver);
- }
- if (autoStartup) {
- container.setAutoStartup(true);
- }
- if (clientId != null) {
- container.setClientId(clientId);
- }
- container.setSubscriptionDurable(subscriptionDurable);
- if (durableSubscriptionName != null) {
- container.setDurableSubscriptionName(durableSubscriptionName);
- }
-
- // lets default to durable subscription if the subscriber name and
- // client ID are specified (as there's
- // no reason to specify them if not! :)
- if (durableSubscriptionName != null && clientId != null) {
- container.setSubscriptionDurable(true);
- }
-
- if (exceptionListener != null) {
- container.setExceptionListener(exceptionListener);
- }
-
- container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
- container.setExposeListenerSession(exposeListenerSession);
- container.setSessionTransacted(transacted);
- if (transacted) {
- container.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
- } else {
- if (acknowledgementMode >= 0) {
- container.setSessionAcknowledgeMode(acknowledgementMode);
- } else if (acknowledgementModeName != null) {
-
container.setSessionAcknowledgeModeName(acknowledgementModeName);
- }
- }
-
- if (container instanceof DefaultMessageListenerContainer) {
- // this includes DefaultMessageListenerContainer102
- DefaultMessageListenerContainer listenerContainer =
(DefaultMessageListenerContainer)container;
- if (concurrentConsumers >= 0) {
- listenerContainer.setConcurrentConsumers(concurrentConsumers);
- }
-
- if (cacheLevel >= 0) {
- listenerContainer.setCacheLevel(cacheLevel);
- } else if (cacheLevelName != null) {
- listenerContainer.setCacheLevelName(cacheLevelName);
- } else {
- listenerContainer.setCacheLevel(defaultCacheLevel(endpoint));
- }
-
- if (idleTaskExecutionLimit >= 0) {
-
listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
- }
- if (maxConcurrentConsumers >= 0) {
-
listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers);
- }
- if (maxMessagesPerTask >= 0) {
- listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
- }
- listenerContainer.setPubSubNoLocal(pubSubNoLocal);
- if (receiveTimeout >= 0) {
- listenerContainer.setReceiveTimeout(receiveTimeout);
- }
- if (recoveryInterval >= 0) {
- listenerContainer.setRecoveryInterval(recoveryInterval);
- }
- if (taskExecutor != null) {
- listenerContainer.setTaskExecutor(taskExecutor);
- }
- if (transactionManager != null) {
- listenerContainer.setTransactionManager(transactionManager);
- } else if (transacted) {
- throw new IllegalArgumentException(
- "Property transacted is
enabled but a transactionManager was not injected!");
- }
- if (transactionName != null) {
- listenerContainer.setTransactionName(transactionName);
- }
- if (transactionTimeout >= 0) {
- listenerContainer.setTransactionTimeout(transactionTimeout);
- }
- } else if (container instanceof ServerSessionMessageListenerContainer)
{
- // this includes ServerSessionMessageListenerContainer102
- ServerSessionMessageListenerContainer listenerContainer =
(ServerSessionMessageListenerContainer)container;
- if (maxMessagesPerTask >= 0) {
- listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
- }
- if (serverSessionFactory != null) {
-
listenerContainer.setServerSessionFactory(serverSessionFactory);
- }
- } else if (container instanceof SimpleMessageListenerContainer) {
- // this includes SimpleMessageListenerContainer102
- SimpleMessageListenerContainer listenerContainer =
(SimpleMessageListenerContainer)container;
- if (concurrentConsumers >= 0) {
- listenerContainer.setConcurrentConsumers(concurrentConsumers);
- }
- listenerContainer.setPubSubNoLocal(pubSubNoLocal);
- if (taskExecutor != null) {
- listenerContainer.setTaskExecutor(taskExecutor);
- }
- }
- }
-
- public void configure(EndpointMessageListener listener) {
- if (isDisableReplyTo()) {
- listener.setDisableReplyTo(true);
- }
- if (isEagerLoadingOfProperties()) {
- listener.setEagerLoadingOfProperties(true);
- }
- }
// Properties
//
-------------------------------------------------------------------------
@@ -600,7 +483,7 @@
}
public boolean isExplicitQosEnabled() {
- return explicitQosEnabled;
+ return explicitQosEnabled != null ? explicitQosEnabled : false;
}
public void setExplicitQosEnabled(boolean explicitQosEnabled) {
@@ -613,6 +496,7 @@
public void setDeliveryPersistent(boolean deliveryPersistent) {
this.deliveryPersistent = deliveryPersistent;
+ configuredQoS();
}
public long getTimeToLive() {
@@ -621,6 +505,7 @@
public void setTimeToLive(long timeToLive) {
this.timeToLive = timeToLive;
+ configuredQoS();
}
public MessageConverter getMessageConverter() {
@@ -653,6 +538,7 @@
public void setPriority(int priority) {
this.priority = priority;
+ configuredQoS();
}
public ConsumerType getConsumerType() {
@@ -712,8 +598,168 @@
this.disableReplyTo = disableReplyTo;
}
+ /**
+ * Set to true if you want to send message using the QoS settings specified
+ * on the message. Normally the QoS settings used are the one configured on
+ * this Object.
+ *
+ * @param preserveMessageQos
+ */
+ public void setPreserveMessageQos(boolean preserveMessageQos) {
+ this.preserveMessageQos = preserveMessageQos;
+ }
+
+ public JmsOperations getJmsOperations() {
+ return jmsOperations;
+ }
+
+ public void setJmsOperations(JmsOperations jmsOperations) {
+ this.jmsOperations = jmsOperations;
+ }
+
+ public DestinationResolver getDestinationResolver() {
+ return destinationResolver;
+ }
+
+ public void setDestinationResolver(DestinationResolver
destinationResolver) {
+ this.destinationResolver = destinationResolver;
+ }
+
+ public long getRequestMapPurgePollTimeMillis() {
+ return requestMapPurgePollTimeMillis;
+ }
+
+ /**
+ * Sets the frequency that the requestMap for InOut exchanges is purged for
+ * timed out message exchanges
+ *
+ * @param requestMapPurgePollTimeMillis
+ */
+ public void setRequestMapPurgePollTimeMillis(long
requestMapPurgePollTimeMillis) {
+ this.requestMapPurgePollTimeMillis = requestMapPurgePollTimeMillis;
+ }
+
+
// Implementation methods
//
-------------------------------------------------------------------------
+
+ protected void
configureMessageListenerContainer(AbstractMessageListenerContainer container,
+ JmsEndpoint endpoint) {
+ container.setConnectionFactory(getListenerConnectionFactory());
+ if (destinationResolver != null) {
+ container.setDestinationResolver(destinationResolver);
+ }
+ if (autoStartup) {
+ container.setAutoStartup(true);
+ }
+ if (clientId != null) {
+ container.setClientId(clientId);
+ }
+ container.setSubscriptionDurable(subscriptionDurable);
+ if (durableSubscriptionName != null) {
+ container.setDurableSubscriptionName(durableSubscriptionName);
+ }
+
+ // lets default to durable subscription if the subscriber name and
+ // client ID are specified (as there's
+ // no reason to specify them if not! :)
+ if (durableSubscriptionName != null && clientId != null) {
+ container.setSubscriptionDurable(true);
+ }
+
+ if (exceptionListener != null) {
+ container.setExceptionListener(exceptionListener);
+ }
+
+ container.setAcceptMessagesWhileStopping(acceptMessagesWhileStopping);
+ container.setExposeListenerSession(exposeListenerSession);
+ container.setSessionTransacted(transacted);
+ if (transacted) {
+ container.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
+ } else {
+ if (acknowledgementMode >= 0) {
+ container.setSessionAcknowledgeMode(acknowledgementMode);
+ } else if (acknowledgementModeName != null) {
+
container.setSessionAcknowledgeModeName(acknowledgementModeName);
+ }
+ }
+
+ if (container instanceof DefaultMessageListenerContainer) {
+ // this includes DefaultMessageListenerContainer102
+ DefaultMessageListenerContainer listenerContainer =
(DefaultMessageListenerContainer)container;
+ if (concurrentConsumers >= 0) {
+ listenerContainer.setConcurrentConsumers(concurrentConsumers);
+ }
+
+ if (cacheLevel >= 0) {
+ listenerContainer.setCacheLevel(cacheLevel);
+ } else if (cacheLevelName != null) {
+ listenerContainer.setCacheLevelName(cacheLevelName);
+ } else {
+ listenerContainer.setCacheLevel(defaultCacheLevel(endpoint));
+ }
+
+ if (idleTaskExecutionLimit >= 0) {
+
listenerContainer.setIdleTaskExecutionLimit(idleTaskExecutionLimit);
+ }
+ if (maxConcurrentConsumers >= 0) {
+
listenerContainer.setMaxConcurrentConsumers(maxConcurrentConsumers);
+ }
+ if (maxMessagesPerTask >= 0) {
+ listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
+ }
+ listenerContainer.setPubSubNoLocal(pubSubNoLocal);
+ if (receiveTimeout >= 0) {
+ listenerContainer.setReceiveTimeout(receiveTimeout);
+ }
+ if (recoveryInterval >= 0) {
+ listenerContainer.setRecoveryInterval(recoveryInterval);
+ }
+ if (taskExecutor != null) {
+ listenerContainer.setTaskExecutor(taskExecutor);
+ }
+ if (transactionManager != null) {
+ listenerContainer.setTransactionManager(transactionManager);
+ } else if (transacted) {
+ throw new IllegalArgumentException(
+ "Property transacted is
enabled but a transactionManager was not injected!");
+ }
+ if (transactionName != null) {
+ listenerContainer.setTransactionName(transactionName);
+ }
+ if (transactionTimeout >= 0) {
+ listenerContainer.setTransactionTimeout(transactionTimeout);
+ }
+ } else if (container instanceof ServerSessionMessageListenerContainer)
{
+ // this includes ServerSessionMessageListenerContainer102
+ ServerSessionMessageListenerContainer listenerContainer =
(ServerSessionMessageListenerContainer)container;
+ if (maxMessagesPerTask >= 0) {
+ listenerContainer.setMaxMessagesPerTask(maxMessagesPerTask);
+ }
+ if (serverSessionFactory != null) {
+
listenerContainer.setServerSessionFactory(serverSessionFactory);
+ }
+ } else if (container instanceof SimpleMessageListenerContainer) {
+ // this includes SimpleMessageListenerContainer102
+ SimpleMessageListenerContainer listenerContainer =
(SimpleMessageListenerContainer)container;
+ if (concurrentConsumers >= 0) {
+ listenerContainer.setConcurrentConsumers(concurrentConsumers);
+ }
+ listenerContainer.setPubSubNoLocal(pubSubNoLocal);
+ if (taskExecutor != null) {
+ listenerContainer.setTaskExecutor(taskExecutor);
+ }
+ }
+ }
+
+ public void configure(EndpointMessageListener listener) {
+ if (isDisableReplyTo()) {
+ listener.setDisableReplyTo(true);
+ }
+ if (isEagerLoadingOfProperties()) {
+ listener.setEagerLoadingOfProperties(true);
+ }
+ }
protected AbstractMessageListenerContainer
chooseMessageListenerContainerImplementation() {
// TODO we could allow a spring container to auto-inject these objects?
switch (consumerType) {
@@ -794,43 +840,14 @@
}
/**
- * Set to true if you want to send message using the QoS settings specified
- * on the message. Normally the QoS settings used are the one configured on
- * this Object.
- *
- * @param preserveMessageQos
- */
- public void setPreserveMessageQos(boolean preserveMessageQos) {
- this.preserveMessageQos = preserveMessageQos;
- }
-
- public JmsOperations getJmsOperations() {
- return jmsOperations;
- }
-
- public void setJmsOperations(JmsOperations jmsOperations) {
- this.jmsOperations = jmsOperations;
- }
-
- public DestinationResolver getDestinationResolver() {
- return destinationResolver;
- }
-
- public void setDestinationResolver(DestinationResolver
destinationResolver) {
- this.destinationResolver = destinationResolver;
- }
-
- public long getRequestMapPurgePollTimeMillis() {
- return requestMapPurgePollTimeMillis;
+ * When one of the QoS properties are configured such as [EMAIL PROTECTED]
#setDeliveryPersistent(boolean)},
+ * [EMAIL PROTECTED] #setPriority(int)} or [EMAIL PROTECTED]
#setTimeToLive(long)} then we should auto default the
+ * setting of [EMAIL PROTECTED] #setExplicitQosEnabled(boolean)} if its
not been configured yet
+ */
+ protected void configuredQoS() {
+ if (explicitQosEnabled == null) {
+ explicitQosEnabled = true;
+ }
}
- /**
- * Sets the frequency that the requestMap for InOut exchanges is purged for
- * timed out message exchanges
- *
- * @param requestMapPurgePollTimeMillis
- */
- public void setRequestMapPurgePollTimeMillis(long
requestMapPurgePollTimeMillis) {
- this.requestMapPurgePollTimeMillis = requestMapPurgePollTimeMillis;
- }
}