Author: gnodet
Date: Wed Jul 2 14:24:10 2008
New Revision: 673504
URL: http://svn.apache.org/viewvc?rev=673504&view=rev
Log:
SM-1434: servicemix-jms does not honor jms 1.02 when sending the out message as
a consumer
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java?rev=673504&r1=673503&r2=673504&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/AbstractConsumerEndpoint.java
Wed Jul 2 14:24:10 2008
@@ -28,7 +28,13 @@
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
import javax.xml.namespace.QName;
import org.apache.servicemix.common.DefaultComponent;
@@ -39,6 +45,7 @@
import org.apache.servicemix.store.StoreFactory;
import org.apache.servicemix.store.memory.MemoryStoreFactory;
import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.JmsTemplate102;
import org.springframework.jms.core.SessionCallback;
import
org.springframework.jms.listener.adapter.ListenerExecutionFailedException;
import org.springframework.jms.support.JmsUtils;
@@ -70,7 +77,8 @@
private boolean stateless;
private StoreFactory storeFactory;
private Store store;
-
+ private boolean jms102;
+
public AbstractConsumerEndpoint() {
super();
}
@@ -311,7 +319,11 @@
public synchronized void start() throws Exception {
super.start();
if (template == null) {
- template = new JmsTemplate(getConnectionFactory());
+ if (isJms102()) {
+ template = new JmsTemplate102(getConnectionFactory(),
isPubSubDomain());
+ } else {
+ template = new JmsTemplate(getConnectionFactory());
+ }
}
if (store == null && !stateless) {
if (storeFactory == null) {
@@ -397,13 +409,37 @@
}
protected void send(Message msg, Session session, Destination dest) throws
JMSException {
- MessageProducer producer = session.createProducer(dest);
+ MessageProducer producer;
+ if (isJms102()) {
+ if (isPubSubDomain()) {
+ producer = ((TopicSession) session).createPublisher((Topic)
dest);
+ } else {
+ producer = ((QueueSession) session).createSender((Queue) dest);
+ }
+ } else {
+ producer = session.createProducer(dest);
+ }
try {
if (replyProperties != null) {
for (Map.Entry<String, Object> e : replyProperties.entrySet())
{
msg.setObjectProperty(e.getKey(), e.getValue());
}
}
+ if (isJms102()) {
+ if (isPubSubDomain()) {
+ if (replyExplicitQosEnabled) {
+ ((TopicPublisher) producer).publish(msg,
replyDeliveryMode, replyPriority, replyTimeToLive);
+ } else {
+ ((TopicPublisher) producer).publish(msg);
+ }
+ } else {
+ if (replyExplicitQosEnabled) {
+ ((QueueSender) producer).send(msg, replyDeliveryMode,
replyPriority, replyTimeToLive);
+ } else {
+ ((QueueSender) producer).send(msg);
+ }
+ }
+ }
if (replyExplicitQosEnabled) {
producer.send(msg, replyDeliveryMode, replyPriority,
replyTimeToLive);
} else {
@@ -553,5 +589,19 @@
setCorrelationId(context.getMessage(), msg);
send(msg, session, dest);
}
-
+
+ /**
+ * @return the jms102
+ */
+ public boolean isJms102() {
+ return jms102;
+ }
+
+ /**
+ * @param jms102 the jms102 to set
+ */
+ public void setJms102(boolean jms102) {
+ this.jms102 = jms102;
+ }
+
}
Modified:
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java
URL:
http://svn.apache.org/viewvc/servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java?rev=673504&r1=673503&r2=673504&view=diff
==============================================================================
---
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java
(original)
+++
servicemix/smx3/branches/servicemix-3.2/deployables/bindingcomponents/servicemix-jms/src/main/java/org/apache/servicemix/jms/endpoints/JmsConsumerEndpoint.java
Wed Jul 2 14:24:10 2008
@@ -61,7 +61,6 @@
// type of listener
private String listenerType = LISTENER_TYPE_DEFAULT;
- private boolean jms102;
private String transacted = TRANSACTED_NONE;
// Standard jms properties
@@ -226,20 +225,6 @@
}
/**
- * @return the jms102
- */
- public boolean isJms102() {
- return jms102;
- }
-
- /**
- * @param jms102 the jms102 to set
- */
- public void setJms102(boolean jms102) {
- this.jms102 = jms102;
- }
-
- /**
* @return the listenerType
*/
public String getListenerType() {
@@ -465,7 +450,7 @@
private AbstractMessageListenerContainer
createServerSessionMessageListenerContainer() {
final ServerSessionMessageListenerContainer cont;
- if (jms102) {
+ if (isJms102()) {
cont = new ServerSessionMessageListenerContainer102();
} else {
cont = new ServerSessionMessageListenerContainer();
@@ -480,7 +465,7 @@
private AbstractMessageListenerContainer
createSimpleMessageListenerContainer() {
final SimpleMessageListenerContainer cont;
- if (jms102) {
+ if (isJms102()) {
cont = new SimpleMessageListenerContainer102();
} else {
cont = new SimpleMessageListenerContainer();
@@ -496,7 +481,7 @@
private AbstractMessageListenerContainer
createDefaultMessageListenerContainer() {
final DefaultMessageListenerContainer cont;
- if (jms102) {
+ if (isJms102()) {
cont = new DefaultMessageListenerContainer102();
} else {
cont = new DefaultMessageListenerContainer();
@@ -519,7 +504,7 @@
}
} else if (TRANSACTED_JMS.equals(transacted)) {
cont.setSessionTransacted(true);
- if (jms102) {
+ if (isJms102()) {
cont.setTransactionManager(new
JmsTransactionManager102(getConnectionFactory(), isPubSubDomain()));
} else {
cont.setTransactionManager(new
JmsTransactionManager(getConnectionFactory()));