Author: rajdavies
Date: Tue Apr 11 21:46:22 2006
New Revision: 393383
URL: http://svn.apache.org/viewcvs?rev=393383&view=rev
Log:
added changes from http://jira.activemq.org/jira//browse/AMQ-660
to allow destination conversation on outbound messages with replyTo destinations
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
Tue Apr 11 21:46:22 2006
@@ -34,7 +34,7 @@
*
* @version $Revision: 1.1.1.1 $
*/
-abstract class DestinationBridge implements Service,MessageListener{
+public abstract class DestinationBridge implements Service,MessageListener{
private static final Log log=LogFactory.getLog(DestinationBridge.class);
protected MessageConsumer consumer;
protected AtomicBoolean started=new AtomicBoolean(false);
@@ -93,32 +93,35 @@
public void stop() throws Exception{
started.set(false);
}
-
+
public void onMessage(Message message){
- if(started.get()&&message!=null){
- try{
- if(doHandleReplyTo){
- Destination replyTo=message.getJMSReplyTo();
- if(replyTo!=null){
- replyTo=processReplyToDestination(replyTo);
- message.setJMSReplyTo(replyTo);
- }
- }else {
- message.setJMSReplyTo(null);
- }
- Message converted=jmsMessageConvertor.convert(message);
- sendMessage(converted);
- message.acknowledge();
- }catch(JMSException e){
- log.error("failed to forward message: "+message,e);
- try{
- stop();
- }catch(Exception e1){
- log.warn("Failed to stop cleanly",e1);
- }
- }
- }
+ if(started.get()&&message!=null){
+ try{
+ Message converted;
+ if(doHandleReplyTo){
+ Destination replyTo = message.getJMSReplyTo();
+ if(replyTo != null){
+ converted =
jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
+ } else {
+ converted =
jmsMessageConvertor.convert(message);
+ }
+ } else {
+ message.setJMSReplyTo(null);
+ converted =
jmsMessageConvertor.convert(message);
+ }
+ sendMessage(converted);
+ message.acknowledge();
+ }catch(JMSException e){
+ log.error("failed to forward message: "+message,e);
+ try{
+ stop();
+ }catch(Exception e1){
+ log.warn("Failed to stop cleanly",e1);
+ }
+ }
+ }
}
+
/**
* @return Returns the doHandleReplyTo.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsMesageConvertor.java
Tue Apr 11 21:46:22 2006
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.network.jms;
+import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -34,5 +36,8 @@
*/
public Message convert(Message message) throws JMSException;
+ public Message convert(Message message, Destination replyTo) throws
JMSException;
+
+ public void setConnection(Connection connection);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
Tue Apr 11 21:46:22 2006
@@ -16,9 +16,6 @@
*/
package org.apache.activemq.network.jms;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -28,6 +25,9 @@
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
/**
* A Bridge to other JMS Queue providers
*
@@ -44,11 +44,7 @@
private QueueConnection outboundQueueConnection;
private QueueConnection localQueueConnection;
private InboundQueueBridge[] inboundQueueBridges;
- private OutboundQueueBridge[] outboundQueueBridges;
-
-
-
-
+ private OutboundQueueBridge[] outboundQueueBridges;
public boolean init(){
boolean result=super.init();
@@ -56,6 +52,8 @@
try{
initializeForeignQueueConnection();
initializeLocalQueueConnection();
+ initializeInboundJmsMessageConvertor();
+ initializeOutboundJmsMessageConvertor();
initializeInboundQueueBridges();
initializeOutboundQueueBridges();
}catch(Exception e){
@@ -249,6 +247,14 @@
}
localQueueConnection.start();
}
+
+ protected void initializeInboundJmsMessageConvertor(){
+ inboundMessageConvertor.setConnection(localQueueConnection);
+ }
+
+ protected void initializeOutboundJmsMessageConvertor(){
+ outboundMessageConvertor.setConnection(outboundQueueConnection);
+ }
protected void initializeInboundQueueBridges() throws JMSException{
if(inboundQueueBridges!=null){
@@ -287,7 +293,6 @@
bridge.setProducerQueue(foreignQueue);
bridge.setProducerConnection(outboundQueueConnection);
bridge.setConsumerConnection(localQueueConnection);
- bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
@@ -299,38 +304,71 @@
}
}
- protected Destination createReplyToBridge(Destination destination,
Connection consumerConnection, Connection producerConnection){
- Queue queue = (Queue)destination;
- OutboundQueueBridge bridge = (OutboundQueueBridge)
replyToBridges.get(queue);
- if (bridge == null){
- bridge = new OutboundQueueBridge(){
- //we only handle replyTo destinations - inbound
- protected Destination processReplyToDestination (Destination
destination){
- return null;
- }
- };
- try{
- QueueSession localSession =
localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
- Queue localQueue = localSession.createTemporaryQueue();
- localSession.close();
- bridge.setConsumerQueue(localQueue);
- bridge.setProducerQueue(queue);
- bridge.setProducerConnection(outboundQueueConnection);
- bridge.setConsumerConnection(localQueueConnection);
- bridge.setDoHandleReplyTo(false);
- if(bridge.getJmsMessageConvertor()==null){
-
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
- }
- bridge.setJmsConnector(this);
- bridge.start();
- log.info("Created replyTo bridge for " + queue);
- }catch(Exception e){
- log.error("Failed to create replyTo bridge for queue: " +
queue,e);
- return null;
- }
- replyToBridges.put(queue, bridge);
- }
- return bridge.getConsumerQueue();
+ protected Destination createReplyToBridge(Destination destination,
Connection replyToProducerConnection, Connection replyToConsumerConnection){
+ Queue replyToProducerQueue =(Queue)destination;
+ boolean isInbound =
replyToProducerConnection.equals(localQueueConnection);
+
+ if(isInbound){
+ InboundQueueBridge bridge = (InboundQueueBridge)
replyToBridges.get(replyToProducerQueue);
+ if (bridge == null){
+ bridge = new InboundQueueBridge(){
+ protected Destination processReplyToDestination
(Destination destination){
+ return null;
+ }
+ };
+ try{
+ QueueSession replyToConsumerSession =
((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
+ Queue replyToConsumerQueue =
replyToConsumerSession.createTemporaryQueue();
+ replyToConsumerSession.close();
+ bridge.setConsumerQueue(replyToConsumerQueue);
+ bridge.setProducerQueue(replyToProducerQueue);
+
bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
+
bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
+ bridge.setDoHandleReplyTo(false);
+ if(bridge.getJmsMessageConvertor()==null){
+
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
+ }
+ bridge.setJmsConnector(this);
+ bridge.start();
+ log.info("Created replyTo bridge for " +
replyToProducerQueue);
+ }catch(Exception e){
+ log.error("Failed to create replyTo bridge for
queue: " + replyToProducerQueue, e);
+ return null;
+ }
+ replyToBridges.put(replyToProducerQueue, bridge);
+ }
+ return bridge.getConsumerQueue();
+ }else{
+ OutboundQueueBridge bridge = (OutboundQueueBridge)
replyToBridges.get(replyToProducerQueue);
+ if (bridge == null){
+ bridge = new OutboundQueueBridge(){
+ protected Destination processReplyToDestination
(Destination destination){
+ return null;
+ }
+ };
+ try{
+ QueueSession replyToConsumerSession =
((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
+ Queue replyToConsumerQueue =
replyToConsumerSession.createTemporaryQueue();
+ replyToConsumerSession.close();
+ bridge.setConsumerQueue(replyToConsumerQueue);
+ bridge.setProducerQueue(replyToProducerQueue);
+
bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
+
bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
+ bridge.setDoHandleReplyTo(false);
+ if(bridge.getJmsMessageConvertor()==null){
+
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
+ }
+ bridge.setJmsConnector(this);
+ bridge.start();
+ log.info("Created replyTo bridge for " +
replyToProducerQueue);
+ }catch(Exception e){
+ log.error("Failed to create replyTo bridge for
queue: " + replyToProducerQueue, e);
+ return null;
+ }
+ replyToBridges.put(replyToProducerQueue, bridge);
+ }
+ return bridge.getConsumerQueue();
+ }
}
protected Queue createActiveMQQueue(QueueSession session,String queueName)
throws JMSException{
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
Tue Apr 11 21:46:22 2006
@@ -25,6 +25,7 @@
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -46,16 +47,14 @@
private InboundTopicBridge[] inboundTopicBridges;
private OutboundTopicBridge[] outboundTopicBridges;
-
-
-
-
public boolean init(){
boolean result=super.init();
if(result){
try{
initializeForeignTopicConnection();
initializeLocalTopicConnection();
+ initializeInboundJmsMessageConvertor();
+ initializeOutboundJmsMessageConvertor();
initializeInboundTopicBridges();
initializeOutboundTopicBridges();
}catch(Exception e){
@@ -250,6 +249,14 @@
}
localTopicConnection.start();
}
+
+ protected void initializeInboundJmsMessageConvertor(){
+ inboundMessageConvertor.setConnection(localTopicConnection);
+ }
+
+ protected void initializeOutboundJmsMessageConvertor(){
+ outboundMessageConvertor.setConnection(outboundTopicConnection);
+ }
protected void initializeInboundTopicBridges() throws JMSException{
if(inboundTopicBridges!=null){
@@ -288,7 +295,6 @@
bridge.setProducerTopic(foreignTopic);
bridge.setProducerConnection(outboundTopicConnection);
bridge.setConsumerConnection(localTopicConnection);
- bridge.setDoHandleReplyTo(false);
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
@@ -300,39 +306,71 @@
}
}
- protected Destination createReplyToBridge(Destination destination,
Connection consumerConnection, Connection producerConnection){
- Topic topic =(Topic)destination;
-
- OutboundTopicBridge bridge = (OutboundTopicBridge)
replyToBridges.get(topic);
- if (bridge == null){
- bridge = new OutboundTopicBridge(){
- //we only handle replyTo destinations - inbound
- protected Destination processReplyToDestination (Destination
destination){
- return null;
- }
- };
- try{
- TopicSession localSession =
localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
- Topic localTopic = localSession.createTemporaryTopic();
- localSession.close();
- bridge.setConsumerTopic(localTopic);
- bridge.setProducerTopic(topic);
- bridge.setProducerConnection(outboundTopicConnection);
- bridge.setConsumerConnection(localTopicConnection);
- bridge.setDoHandleReplyTo(false);
- if(bridge.getJmsMessageConvertor()==null){
-
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
- }
- bridge.setJmsConnector(this);
- bridge.start();
- log.info("Created replyTo bridge for " + topic);
- }catch(Exception e){
- log.error("Failed to create replyTo bridge for topic: " +
topic,e);
- return null;
- }
- replyToBridges.put(topic, bridge);
- }
- return bridge.getConsumerTopic();
+ protected Destination createReplyToBridge(Destination destination,
Connection replyToProducerConnection, Connection replyToConsumerConnection){
+ Topic replyToProducerTopic =(Topic)destination;
+ boolean isInbound =
replyToProducerConnection.equals(localTopicConnection);
+
+ if(isInbound){
+ InboundTopicBridge bridge = (InboundTopicBridge)
replyToBridges.get(replyToProducerTopic);
+ if (bridge == null){
+ bridge = new InboundTopicBridge(){
+ protected Destination processReplyToDestination
(Destination destination){
+ return null;
+ }
+ };
+ try{
+ TopicSession replyToConsumerSession =
((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
+ Topic replyToConsumerTopic =
replyToConsumerSession.createTemporaryTopic();
+ replyToConsumerSession.close();
+ bridge.setConsumerTopic(replyToConsumerTopic);
+ bridge.setProducerTopic(replyToProducerTopic);
+
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
+
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
+ bridge.setDoHandleReplyTo(false);
+ if(bridge.getJmsMessageConvertor()==null){
+
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
+ }
+ bridge.setJmsConnector(this);
+ bridge.start();
+ log.info("Created replyTo bridge for " +
replyToProducerTopic);
+ }catch(Exception e){
+ log.error("Failed to create replyTo bridge for
topic: " + replyToProducerTopic, e);
+ return null;
+ }
+ replyToBridges.put(replyToProducerTopic, bridge);
+ }
+ return bridge.getConsumerTopic();
+ }else{
+ OutboundTopicBridge bridge = (OutboundTopicBridge)
replyToBridges.get(replyToProducerTopic);
+ if (bridge == null){
+ bridge = new OutboundTopicBridge(){
+ protected Destination processReplyToDestination
(Destination destination){
+ return null;
+ }
+ };
+ try{
+ TopicSession replyToConsumerSession =
((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
+ Topic replyToConsumerTopic =
replyToConsumerSession.createTemporaryTopic();
+ replyToConsumerSession.close();
+ bridge.setConsumerTopic(replyToConsumerTopic);
+ bridge.setProducerTopic(replyToProducerTopic);
+
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
+
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
+ bridge.setDoHandleReplyTo(false);
+ if(bridge.getJmsMessageConvertor()==null){
+
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
+ }
+ bridge.setJmsConnector(this);
+ bridge.start();
+ log.info("Created replyTo bridge for " +
replyToProducerTopic);
+ }catch(Exception e){
+ log.error("Failed to create replyTo bridge for
topic: " + replyToProducerTopic, e);
+ return null;
+ }
+ replyToBridges.put(replyToProducerTopic, bridge);
+ }
+ return bridge.getConsumerTopic();
+ }
}
protected Topic createActiveMQTopic(TopicSession session,String topicName)
throws JMSException{
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java?rev=393383&r1=393382&r2=393383&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/SimpleJmsMessageConvertor.java
Tue Apr 11 21:46:22 2006
@@ -16,6 +16,8 @@
*/
package org.apache.activemq.network.jms;
+import javax.jms.Connection;
+import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -37,6 +39,20 @@
*/
public Message convert(Message message) throws JMSException{
return message;
+ }
+
+ public Message convert(Message message, Destination replyTo) throws
JMSException{
+ Message msg = convert(message);
+ if(replyTo != null) {
+ msg.setJMSReplyTo(replyTo);
+ }else{
+ msg.setJMSReplyTo(null);
+ }
+ return msg;
+ }
+
+ public void setConnection(Connection connection){
+ //do nothing
}