Author: rajdavies
Date: Wed Feb 8 10:26:35 2006
New Revision: 376019
URL: http://svn.apache.org/viewcvs?rev=376019&view=rev
Log:
rationalize the QueueBridge/TopicBridge
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java?rev=376019&r1=376018&r2=376019&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
Wed Feb 8 10:26:35 2006
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.network.jms;
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -39,6 +40,7 @@
protected AtomicBoolean started=new AtomicBoolean(false);
protected JmsMesageConvertor jmsMessageConvertor;
protected boolean doHandleReplyTo = true;
+ protected JmsConnector jmsConnector;
/**
* @return Returns the consumer.
@@ -56,6 +58,12 @@
}
/**
+ * @param connector
+ */
+ public void setJmsConnector(JmsConnector connector){
+ this.jmsConnector = connector;
+ }
+ /**
* @return Returns the inboundMessageConvertor.
*/
public JmsMesageConvertor getJmsMessageConvertor(){
@@ -63,13 +71,17 @@
}
/**
- * @param inboundMessageConvertor
- * The inboundMessageConvertor to set.
+ * @param jmsMessageConvertor
*/
public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){
this.jmsMessageConvertor=jmsMessageConvertor;
}
+
+ protected Destination processReplyToDestination (Destination destination){
+ return jmsConnector.createReplyToBridge(destination,
getConsumerConnection(), getProducerConnection());
+ }
+
public void start() throws Exception{
if(started.compareAndSet(false,true)){
MessageConsumer consumer=createConsumer();
@@ -128,7 +140,9 @@
protected abstract void sendMessage(Message message) throws JMSException;
- protected abstract Destination processReplyToDestination(Destination
destination);
+ protected abstract Connection getConsumerConnection();
+
+ protected abstract Connection getProducerConnection();
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?rev=376019&r1=376018&r2=376019&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
Wed Feb 8 10:26:35 2006
@@ -19,6 +19,10 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import javax.jms.Connection;
+import javax.jms.Destination;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
@@ -44,10 +48,14 @@
protected JmsMesageConvertor outboundMessageConvertor;
private List inboundBridges = new CopyOnWriteArrayList();
private List outboundBridges = new CopyOnWriteArrayList();
- protected int replyToDestinationCacheSize=10000;
protected AtomicBoolean initialized = new AtomicBoolean(false);
protected AtomicBoolean started = new AtomicBoolean(false);
protected ActiveMQConnectionFactory embeddedConnectionFactory;
+ protected int replyToDestinationCacheSize=10000;
+ protected String outboundUsername;
+ protected String outboundPassword;
+ protected String localUsername;
+ protected String localPassword;
protected LRUCache replyToBridges=new LRUCache(){
protected boolean removeEldestEntry(Map.Entry enty){
if(size()>maxCacheSize){
@@ -113,6 +121,8 @@
}
}
+ protected abstract Destination createReplyToBridge(Destination
destination, Connection consumerConnection, Connection producerConnection);
+
/**
* One way to configure the local connection - this is called by
* The BrokerService when the Connector is embedded
@@ -196,6 +206,62 @@
}
+ /**
+ * @return Returns the localPassword.
+ */
+ public String getLocalPassword(){
+ return localPassword;
+ }
+
+ /**
+ * @param localPassword The localPassword to set.
+ */
+ public void setLocalPassword(String localPassword){
+ this.localPassword=localPassword;
+ }
+
+ /**
+ * @return Returns the localUsername.
+ */
+ public String getLocalUsername(){
+ return localUsername;
+ }
+
+ /**
+ * @param localUsername The localUsername to set.
+ */
+ public void setLocalUsername(String localUsername){
+ this.localUsername=localUsername;
+ }
+
+ /**
+ * @return Returns the outboundPassword.
+ */
+ public String getOutboundPassword(){
+ return outboundPassword;
+ }
+
+ /**
+ * @param outboundPassword The outboundPassword to set.
+ */
+ public void setOutboundPassword(String outboundPassword){
+ this.outboundPassword=outboundPassword;
+ }
+
+ /**
+ * @return Returns the outboundUsername.
+ */
+ public String getOutboundUsername(){
+ return outboundUsername;
+ }
+
+ /**
+ * @param outboundUsername The outboundUsername to set.
+ */
+ public void setOutboundUsername(String outboundUsername){
+ this.outboundUsername=outboundUsername;
+ }
+
protected void addInboundBridge(DestinationBridge bridge){
inboundBridges.add(bridge);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java?rev=376019&r1=376018&r2=376019&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
Wed Feb 8 10:26:35 2006
@@ -19,6 +19,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Queue;
@@ -44,10 +45,7 @@
private QueueConnection localQueueConnection;
private InboundQueueBridge[] inboundQueueBridges;
private OutboundQueueBridge[] outboundQueueBridges;
- private String outboundUsername;
- private String outboundPassword;
- private String localUsername;
- private String localPassword;
+
@@ -189,80 +187,7 @@
this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
}
- /**
- * @return Returns the outboundPassword.
- */
- public String getOutboundPassword(){
- return outboundPassword;
- }
-
- /**
- * @param outboundPassword
- * The outboundPassword to set.
- */
- public void setOutboundPassword(String foreignPassword){
- this.outboundPassword=foreignPassword;
- }
-
- /**
- * @return Returns the outboundUsername.
- */
- public String getOutboundUsername(){
- return outboundUsername;
- }
-
- /**
- * @param outboundUsername
- * The outboundUsername to set.
- */
- public void setOutboundUsername(String foreignUsername){
- this.outboundUsername=foreignUsername;
- }
-
- /**
- * @return Returns the localPassword.
- */
- public String getLocalPassword(){
- return localPassword;
- }
-
- /**
- * @param localPassword
- * The localPassword to set.
- */
- public void setLocalPassword(String localPassword){
- this.localPassword=localPassword;
- }
-
- /**
- * @return Returns the localUsername.
- */
- public String getLocalUsername(){
- return localUsername;
- }
-
- /**
- * @param localUsername
- * The localUsername to set.
- */
- public void setLocalUsername(String localUsername){
- this.localUsername=localUsername;
- }
- /**
- * @return Returns the replyToDestinationCacheSize.
- */
- public int getReplyToDestinationCacheSize(){
- return replyToDestinationCacheSize;
- }
-
- /**
- * @param replyToDestinationCacheSize The replyToDestinationCacheSize to
set.
- */
- public void setReplyToDestinationCacheSize(int temporaryQueueCacheSize){
- this.replyToDestinationCacheSize=temporaryQueueCacheSize;
- }
-
protected void initializeForeignQueueConnection() throws
NamingException,JMSException{
if(outboundQueueConnection==null){
// get the connection factories
@@ -341,7 +266,7 @@
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
- bridge.setJmsQueueConnector(this);
+ bridge.setJmsConnector(this);
addInboundBridge(bridge);
}
outboundSession.close();
@@ -366,7 +291,7 @@
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
- bridge.setJmsQueueConnector(this);
+ bridge.setJmsConnector(this);
addOutboundBridge(bridge);
}
outboundSession.close();
@@ -374,7 +299,8 @@
}
}
- protected Destination createReplyToQueueBridge(Queue queue,
QueueConnection consumerConnection, QueueConnection producerConnection){
+ protected Destination createReplyToBridge(Destination destination,
Connection consumerConnection, Connection producerConnection){
+ Queue queue = (Queue)destination;
OutboundQueueBridge bridge = (OutboundQueueBridge)
replyToBridges.get(queue);
if (bridge == null){
bridge = new OutboundQueueBridge(){
@@ -395,7 +321,7 @@
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
- bridge.setJmsQueueConnector(this);
+ bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + queue);
}catch(Exception e){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java?rev=376019&r1=376018&r2=376019&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
Wed Feb 8 10:26:35 2006
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.network.jms;
+import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Session;
@@ -44,10 +45,7 @@
private TopicConnection localTopicConnection;
private InboundTopicBridge[] inboundTopicBridges;
private OutboundTopicBridge[] outboundTopicBridges;
- private String outboundUsername;
- private String outboundPassword;
- private String localUsername;
- private String localPassword;
+
@@ -189,79 +187,7 @@
this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
}
- /**
- * @return Returns the outboundPassword.
- */
- public String getOutboundPassword(){
- return outboundPassword;
- }
-
- /**
- * @param outboundPassword
- * The outboundPassword to set.
- */
- public void setOutboundPassword(String foreignPassword){
- this.outboundPassword=foreignPassword;
- }
-
- /**
- * @return Returns the outboundUsername.
- */
- public String getOutboundUsername(){
- return outboundUsername;
- }
-
- /**
- * @param outboundUsername
- * The outboundUsername to set.
- */
- public void setOutboundUsername(String foreignUsername){
- this.outboundUsername=foreignUsername;
- }
-
- /**
- * @return Returns the localPassword.
- */
- public String getLocalPassword(){
- return localPassword;
- }
-
- /**
- * @param localPassword
- * The localPassword to set.
- */
- public void setLocalPassword(String localPassword){
- this.localPassword=localPassword;
- }
-
- /**
- * @return Returns the localUsername.
- */
- public String getLocalUsername(){
- return localUsername;
- }
-
- /**
- * @param localUsername
- * The localUsername to set.
- */
- public void setLocalUsername(String localUsername){
- this.localUsername=localUsername;
- }
- /**
- * @return Returns the replyToDestinationCacheSize.
- */
- public int getReplyToDestinationCacheSize(){
- return replyToDestinationCacheSize;
- }
-
- /**
- * @param replyToDestinationCacheSize The replyToDestinationCacheSize to
set.
- */
- public void setReplyToDestinationCacheSize(int temporaryTopicCacheSize){
- this.replyToDestinationCacheSize=temporaryTopicCacheSize;
- }
protected void initializeForeignTopicConnection() throws
NamingException,JMSException{
if(outboundTopicConnection==null){
@@ -341,7 +267,7 @@
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
}
- bridge.setJmsTopicConnector(this);
+ bridge.setJmsConnector(this);
addInboundBridge(bridge);
}
outboundSession.close();
@@ -366,7 +292,7 @@
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
- bridge.setJmsTopicConnector(this);
+ bridge.setJmsConnector(this);
addOutboundBridge(bridge);
}
outboundSession.close();
@@ -374,7 +300,9 @@
}
}
- protected Destination createReplyToTopicBridge(Topic topic,
TopicConnection consumerConnection, TopicConnection producerConnection){
+ protected Destination createReplyToBridge(Destination destination,
Connection consumerConnection, Connection producerConnection){
+ Topic topic =(Topic)destination;
+
OutboundTopicBridge bridge = (OutboundTopicBridge)
replyToBridges.get(topic);
if (bridge == null){
bridge = new OutboundTopicBridge(){
@@ -395,7 +323,7 @@
if(bridge.getJmsMessageConvertor()==null){
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
}
- bridge.setJmsTopicConnector(this);
+ bridge.setJmsConnector(this);
bridge.start();
log.info("Created replyTo bridge for " + topic);
}catch(Exception e){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java?rev=376019&r1=376018&r2=376019&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
Wed Feb 8 10:26:35 2006
@@ -42,7 +42,7 @@
protected QueueSender producer;
protected QueueConnection consumerConnection;
protected QueueConnection producerConnection;
- protected JmsQueueConnector jmsQueueConnector;
+
public void stop() throws Exception{
super.stop();
@@ -54,9 +54,7 @@
}
}
- protected void setJmsQueueConnector(JmsQueueConnector connector){
- this.jmsQueueConnector = connector;
- }
+
protected MessageConsumer createConsumer() throws JMSException{
// set up the consumer
@@ -79,12 +77,7 @@
}
- protected Destination processReplyToDestination (Destination destination){
- Queue queue = (Queue)destination;
- return jmsQueueConnector.createReplyToQueueBridge(queue,
getConsumerConnection(), getProducerConnection());
- }
-
-
+
protected void sendMessage(Message message) throws JMSException{
producer.send(producerQueue,message);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java?rev=376019&r1=376018&r2=376019&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
Wed Feb 8 10:26:35 2006
@@ -41,7 +41,7 @@
protected TopicPublisher producer;
protected TopicConnection consumerConnection;
protected TopicConnection producerConnection;
- protected JmsTopicConnector jmsTopicConnector;
+
public void stop() throws Exception{
super.stop();
@@ -53,9 +53,7 @@
}
}
- protected void setJmsTopicConnector(JmsTopicConnector connector){
- this.jmsTopicConnector = connector;
- }
+
protected MessageConsumer createConsumer() throws JMSException{
// set up the consumer
@@ -78,10 +76,7 @@
return consumer;
}
- protected Destination processReplyToDestination (Destination destination){
- Topic topic = (Topic)destination;
- return jmsTopicConnector.createReplyToTopicBridge(topic,
getConsumerConnection(), getProducerConnection());
- }
+
protected MessageProducer createProducer() throws JMSException{
producer = producerSession.createPublisher(null);