Author: jstrachan
Date: Fri Aug 25 04:01:30 2006
New Revision: 436752
URL: http://svn.apache.org/viewvc?rev=436752&view=rev
Log:
attempt to reconnect to the remote JMS broker if we get a fail sending a
message to it - an attempt at fixing AMQ-895
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/DestinationBridge.java
Fri Aug 25 04:01:30 2006
@@ -17,6 +17,12 @@
*/
package org.apache.activemq.network.jms;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.activemq.Service;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -24,29 +30,26 @@
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
+import javax.naming.NamingException;
-import org.apache.activemq.Service;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* A Destination bridge is used to bridge between to different JMS systems
*
* @version $Revision: 1.1.1.1 $
*/
-public abstract class DestinationBridge implements Service,MessageListener{
- private static final Log log=LogFactory.getLog(DestinationBridge.class);
+public abstract class DestinationBridge implements Service, MessageListener {
+ private static final Log log = LogFactory.getLog(DestinationBridge.class);
protected MessageConsumer consumer;
- protected AtomicBoolean started=new AtomicBoolean(false);
+ protected AtomicBoolean started = new AtomicBoolean(false);
protected JmsMesageConvertor jmsMessageConvertor;
protected boolean doHandleReplyTo = true;
protected JmsConnector jmsConnector;
+ private int maximumRetries = 10;
/**
* @return Returns the consumer.
*/
- public MessageConsumer getConsumer(){
+ public MessageConsumer getConsumer() {
return consumer;
}
@@ -54,88 +57,110 @@
* @param consumer
* The consumer to set.
*/
- public void setConsumer(MessageConsumer consumer){
- this.consumer=consumer;
+ public void setConsumer(MessageConsumer consumer) {
+ this.consumer = consumer;
}
/**
* @param connector
*/
- public void setJmsConnector(JmsConnector connector){
+ public void setJmsConnector(JmsConnector connector) {
this.jmsConnector = connector;
}
+
/**
* @return Returns the inboundMessageConvertor.
*/
- public JmsMesageConvertor getJmsMessageConvertor(){
+ public JmsMesageConvertor getJmsMessageConvertor() {
return jmsMessageConvertor;
}
/**
- * @param jmsMessageConvertor
+ * @param jmsMessageConvertor
+ */
+ public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor)
{
+ this.jmsMessageConvertor = jmsMessageConvertor;
+ }
+
+ public int getMaximumRetries() {
+ return maximumRetries;
+ }
+
+ /**
+ * Sets the maximum number of retries if a send fails before closing the
+ * bridge
*/
- public void setJmsMessageConvertor(JmsMesageConvertor jmsMessageConvertor){
- this.jmsMessageConvertor=jmsMessageConvertor;
+ public void setMaximumRetries(int maximumRetries) {
+ this.maximumRetries = maximumRetries;
}
-
- protected Destination processReplyToDestination (Destination destination){
+ protected Destination processReplyToDestination(Destination destination) {
return jmsConnector.createReplyToBridge(destination,
getConnnectionForConsumer(), getConnectionForProducer());
}
-
- public void start() throws Exception{
- if(started.compareAndSet(false,true)){
- MessageConsumer consumer=createConsumer();
+
+ public void start() throws Exception {
+ if (started.compareAndSet(false, true)) {
+ MessageConsumer consumer = createConsumer();
consumer.setMessageListener(this);
createProducer();
}
}
- public void stop() throws Exception{
+ public void stop() throws Exception {
started.set(false);
}
-
- public void onMessage(Message message){
- if(started.get()&&message!=null){
- try{
- Message converted;
- if(doHandleReplyTo){
- Destination replyTo = message.getJMSReplyTo();
- if(replyTo != null){
- converted =
jmsMessageConvertor.convert(message, processReplyToDestination(replyTo));
- } else {
- converted =
jmsMessageConvertor.convert(message);
- }
- } else {
- message.setJMSReplyTo(null);
- converted =
jmsMessageConvertor.convert(message);
- }
- sendMessage(converted);
- message.acknowledge();
- }catch(JMSException e){
- log.error("failed to forward message: "+message,e);
- try{
- stop();
- }catch(Exception e1){
- log.warn("Failed to stop cleanly",e1);
- }
- }
- }
+
+ public void onMessage(Message message) {
+ if (started.get() && message != null) {
+ int attempt = 0;
+ try {
+ if (attempt > 0) {
+ restartProducer();
+ }
+ Message converted;
+ if (doHandleReplyTo) {
+ Destination replyTo = message.getJMSReplyTo();
+ if (replyTo != null) {
+ converted = jmsMessageConvertor.convert(message,
processReplyToDestination(replyTo));
+ }
+ else {
+ converted = jmsMessageConvertor.convert(message);
+ }
+ }
+ else {
+ message.setJMSReplyTo(null);
+ converted = jmsMessageConvertor.convert(message);
+ }
+ sendMessage(converted);
+ message.acknowledge();
+ }
+ catch (Exception e) {
+ log.error("failed to forward message on attempt: " +
(++attempt) + " reason: " + e + " message: " + message, e);
+ if (maximumRetries > 0 && attempt >= maximumRetries) {
+ try {
+ stop();
+ }
+ catch (Exception e1) {
+ log.warn("Failed to stop cleanly", e1);
+ }
+ }
+ }
+ }
}
-
/**
* @return Returns the doHandleReplyTo.
*/
- protected boolean isDoHandleReplyTo(){
+ protected boolean isDoHandleReplyTo() {
return doHandleReplyTo;
}
/**
- * @param doHandleReplyTo The doHandleReplyTo to set.
+ * @param doHandleReplyTo
+ * The doHandleReplyTo to set.
*/
- protected void setDoHandleReplyTo(boolean doHandleReplyTo){
- this.doHandleReplyTo=doHandleReplyTo;
+ protected void setDoHandleReplyTo(boolean doHandleReplyTo) {
+ this.doHandleReplyTo = doHandleReplyTo;
}
protected abstract MessageConsumer createConsumer() throws JMSException;
@@ -145,8 +170,17 @@
protected abstract void sendMessage(Message message) throws JMSException;
protected abstract Connection getConnnectionForConsumer();
-
+
protected abstract Connection getConnectionForProducer();
-
+ protected void restartProducer() throws JMSException, NamingException {
+ try {
+ getConnectionForProducer().close();
+ }
+ catch (Exception e) {
+ log.debug("Ignoring failure to close producer connection: " + e,
e);
+ }
+ jmsConnector.restartProducerConnection();
+ createProducer();
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsConnector.java
Fri Aug 25 04:01:30 2006
@@ -23,6 +23,8 @@
import javax.jms.Connection;
import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.naming.NamingException;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.Service;
@@ -36,14 +38,15 @@
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
- * This bridge joins the gap between foreign JMS providers and ActiveMQ As
some JMS providers are still only 1.0.1
- * compliant, this bridge itself aimed to be JMS 1.0.2 compliant.
+ * This bridge joins the gap between foreign JMS providers and ActiveMQ As some
+ * JMS providers are still only 1.0.1 compliant, this bridge itself aimed to be
+ * JMS 1.0.2 compliant.
*
* @version $Revision: 1.1.1.1 $
*/
-public abstract class JmsConnector implements Service{
-
- private static final Log log=LogFactory.getLog(JmsConnector.class);
+public abstract class JmsConnector implements Service {
+
+ private static final Log log = LogFactory.getLog(JmsConnector.class);
protected JndiTemplate jndiLocalTemplate;
protected JndiTemplate jndiOutboundTemplate;
protected JmsMesageConvertor inboundMessageConvertor;
@@ -52,101 +55,103 @@
private List outboundBridges = new CopyOnWriteArrayList();
protected AtomicBoolean initialized = new AtomicBoolean(false);
protected AtomicBoolean started = new AtomicBoolean(false);
- protected ActiveMQConnectionFactory embeddedConnectionFactory;
- protected int replyToDestinationCacheSize=10000;
+ protected ActiveMQConnectionFactory embeddedConnectionFactory;
+ protected int replyToDestinationCacheSize = 10000;
protected String outboundUsername;
protected String outboundPassword;
protected String localUsername;
protected String localPassword;
private String name;
-
- protected LRUCache replyToBridges=new LRUCache(){
+
+ protected LRUCache replyToBridges = new LRUCache() {
/**
*
*/
private static final long serialVersionUID = -7446792754185879286L;
- protected boolean removeEldestEntry(Map.Entry enty){
- if(size()>maxCacheSize){
- Iterator iter=entrySet().iterator();
- Map.Entry lru=(Map.Entry) iter.next();
+ protected boolean removeEldestEntry(Map.Entry enty) {
+ if (size() > maxCacheSize) {
+ Iterator iter = entrySet().iterator();
+ Map.Entry lru = (Map.Entry) iter.next();
remove(lru.getKey());
- DestinationBridge bridge=(DestinationBridge) lru.getValue();
- try{
+ DestinationBridge bridge = (DestinationBridge) lru.getValue();
+ try {
bridge.stop();
- log.info("Expired bridge: "+bridge);
- }catch(Exception e){
- log.warn("stopping expired bridge"+bridge+" caused an
exception",e);
+ log.info("Expired bridge: " + bridge);
+ }
+ catch (Exception e) {
+ log.warn("stopping expired bridge" + bridge + " caused an
exception", e);
}
}
return false;
}
};
- public boolean init(){
- boolean result=initialized.compareAndSet(false,true);
- if(result){
- if(jndiLocalTemplate==null){
- jndiLocalTemplate=new JndiTemplate();
+ public boolean init() {
+ boolean result = initialized.compareAndSet(false, true);
+ if (result) {
+ if (jndiLocalTemplate == null) {
+ jndiLocalTemplate = new JndiTemplate();
}
- if(jndiOutboundTemplate==null){
- jndiOutboundTemplate=new JndiTemplate();
+ if (jndiOutboundTemplate == null) {
+ jndiOutboundTemplate = new JndiTemplate();
}
- if(inboundMessageConvertor==null){
- inboundMessageConvertor=new SimpleJmsMessageConvertor();
+ if (inboundMessageConvertor == null) {
+ inboundMessageConvertor = new SimpleJmsMessageConvertor();
}
- if (outboundMessageConvertor==null){
- outboundMessageConvertor=new SimpleJmsMessageConvertor();
+ if (outboundMessageConvertor == null) {
+ outboundMessageConvertor = new SimpleJmsMessageConvertor();
}
replyToBridges.setMaxCacheSize(getReplyToDestinationCacheSize());
}
return result;
}
-
- public void start() throws Exception{
+
+ public void start() throws Exception {
init();
- if (started.compareAndSet(false, true)){
- for(int i=0;i<inboundBridges.size();i++){
- DestinationBridge bridge=(DestinationBridge)
inboundBridges.get(i);
+ if (started.compareAndSet(false, true)) {
+ for (int i = 0; i < inboundBridges.size(); i++) {
+ DestinationBridge bridge = (DestinationBridge)
inboundBridges.get(i);
bridge.start();
}
- for(int i=0;i<outboundBridges.size();i++){
- DestinationBridge bridge=(DestinationBridge)
outboundBridges.get(i);
+ for (int i = 0; i < outboundBridges.size(); i++) {
+ DestinationBridge bridge = (DestinationBridge)
outboundBridges.get(i);
bridge.start();
}
- log.info("JMS Connector "+getName()+" Started");
+ log.info("JMS Connector " + getName() + " Started");
}
}
- public void stop() throws Exception{
- if(started.compareAndSet(true,false)){
- for(int i=0;i<inboundBridges.size();i++){
- DestinationBridge bridge=(DestinationBridge)
inboundBridges.get(i);
+ public void stop() throws Exception {
+ if (started.compareAndSet(true, false)) {
+ for (int i = 0; i < inboundBridges.size(); i++) {
+ DestinationBridge bridge = (DestinationBridge)
inboundBridges.get(i);
bridge.stop();
}
- for(int i=0;i<outboundBridges.size();i++){
- DestinationBridge bridge=(DestinationBridge)
outboundBridges.get(i);
+ for (int i = 0; i < outboundBridges.size(); i++) {
+ DestinationBridge bridge = (DestinationBridge)
outboundBridges.get(i);
bridge.stop();
}
- log.info("JMS Connector "+getName()+" Stopped");
+ log.info("JMS Connector " + getName() + " Stopped");
}
}
-
+
protected abstract Destination createReplyToBridge(Destination
destination, Connection consumerConnection, Connection producerConnection);
-
+
/**
- * One way to configure the local connection - this is called by
- * The BrokerService when the Connector is embedded
+ * One way to configure the local connection - this is called by The
+ * BrokerService when the Connector is embedded
+ *
* @param service
*/
- public void setBrokerService(BrokerService service){
+ public void setBrokerService(BrokerService service) {
embeddedConnectionFactory = new
ActiveMQConnectionFactory(service.getVmConnectorURI());
}
/**
* @return Returns the jndiTemplate.
*/
- public JndiTemplate getJndiLocalTemplate(){
+ public JndiTemplate getJndiLocalTemplate() {
return jndiLocalTemplate;
}
@@ -154,28 +159,29 @@
* @param jndiTemplate
* The jndiTemplate to set.
*/
- public void setJndiLocalTemplate(JndiTemplate jndiTemplate){
- this.jndiLocalTemplate=jndiTemplate;
+ public void setJndiLocalTemplate(JndiTemplate jndiTemplate) {
+ this.jndiLocalTemplate = jndiTemplate;
}
/**
* @return Returns the jndiOutboundTemplate.
*/
- public JndiTemplate getJndiOutboundTemplate(){
+ public JndiTemplate getJndiOutboundTemplate() {
return jndiOutboundTemplate;
}
/**
- * @param jndiOutboundTemplate The jndiOutboundTemplate to set.
+ * @param jndiOutboundTemplate
+ * The jndiOutboundTemplate to set.
*/
- public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate){
- this.jndiOutboundTemplate=jndiOutboundTemplate;
+ public void setJndiOutboundTemplate(JndiTemplate jndiOutboundTemplate) {
+ this.jndiOutboundTemplate = jndiOutboundTemplate;
}
/**
* @return Returns the inboundMessageConvertor.
*/
- public JmsMesageConvertor getInboundMessageConvertor(){
+ public JmsMesageConvertor getInboundMessageConvertor() {
return inboundMessageConvertor;
}
@@ -183,28 +189,29 @@
* @param inboundMessageConvertor
* The inboundMessageConvertor to set.
*/
- public void setInboundMessageConvertor(JmsMesageConvertor
jmsMessageConvertor){
- this.inboundMessageConvertor=jmsMessageConvertor;
+ public void setInboundMessageConvertor(JmsMesageConvertor
jmsMessageConvertor) {
+ this.inboundMessageConvertor = jmsMessageConvertor;
}
/**
* @return Returns the outboundMessageConvertor.
*/
- public JmsMesageConvertor getOutboundMessageConvertor(){
+ public JmsMesageConvertor getOutboundMessageConvertor() {
return outboundMessageConvertor;
}
/**
- * @param outboundMessageConvertor The outboundMessageConvertor to set.
+ * @param outboundMessageConvertor
+ * The outboundMessageConvertor to set.
*/
- public void setOutboundMessageConvertor(JmsMesageConvertor
outboundMessageConvertor){
- this.outboundMessageConvertor=outboundMessageConvertor;
+ public void setOutboundMessageConvertor(JmsMesageConvertor
outboundMessageConvertor) {
+ this.outboundMessageConvertor = outboundMessageConvertor;
}
/**
* @return Returns the replyToDestinationCacheSize.
*/
- public int getReplyToDestinationCacheSize(){
+ public int getReplyToDestinationCacheSize() {
return replyToDestinationCacheSize;
}
@@ -212,90 +219,95 @@
* @param replyToDestinationCacheSize
* The replyToDestinationCacheSize to set.
*/
- public void setReplyToDestinationCacheSize(int
replyToDestinationCacheSize){
- this.replyToDestinationCacheSize=replyToDestinationCacheSize;
+ public void setReplyToDestinationCacheSize(int
replyToDestinationCacheSize) {
+ this.replyToDestinationCacheSize = replyToDestinationCacheSize;
}
-
-
+
/**
* @return Returns the localPassword.
*/
- public String getLocalPassword(){
+ public String getLocalPassword() {
return localPassword;
}
/**
- * @param localPassword The localPassword to set.
+ * @param localPassword
+ * The localPassword to set.
*/
- public void setLocalPassword(String localPassword){
- this.localPassword=localPassword;
+ public void setLocalPassword(String localPassword) {
+ this.localPassword = localPassword;
}
/**
* @return Returns the localUsername.
*/
- public String getLocalUsername(){
+ public String getLocalUsername() {
return localUsername;
}
/**
- * @param localUsername The localUsername to set.
+ * @param localUsername
+ * The localUsername to set.
*/
- public void setLocalUsername(String localUsername){
- this.localUsername=localUsername;
+ public void setLocalUsername(String localUsername) {
+ this.localUsername = localUsername;
}
/**
* @return Returns the outboundPassword.
*/
- public String getOutboundPassword(){
+ public String getOutboundPassword() {
return outboundPassword;
}
/**
- * @param outboundPassword The outboundPassword to set.
+ * @param outboundPassword
+ * The outboundPassword to set.
*/
- public void setOutboundPassword(String outboundPassword){
- this.outboundPassword=outboundPassword;
+ public void setOutboundPassword(String outboundPassword) {
+ this.outboundPassword = outboundPassword;
}
/**
* @return Returns the outboundUsername.
*/
- public String getOutboundUsername(){
+ public String getOutboundUsername() {
return outboundUsername;
}
/**
- * @param outboundUsername The outboundUsername to set.
+ * @param outboundUsername
+ * The outboundUsername to set.
*/
- public void setOutboundUsername(String outboundUsername){
- this.outboundUsername=outboundUsername;
+ public void setOutboundUsername(String outboundUsername) {
+ this.outboundUsername = outboundUsername;
}
- protected void addInboundBridge(DestinationBridge bridge){
+ protected void addInboundBridge(DestinationBridge bridge) {
inboundBridges.add(bridge);
}
-
- protected void addOutboundBridge(DestinationBridge bridge){
+
+ protected void addOutboundBridge(DestinationBridge bridge) {
outboundBridges.add(bridge);
}
- protected void removeInboundBridge(DestinationBridge bridge){
+
+ protected void removeInboundBridge(DestinationBridge bridge) {
inboundBridges.add(bridge);
}
-
- protected void removeOutboundBridge(DestinationBridge bridge){
+
+ protected void removeOutboundBridge(DestinationBridge bridge) {
outboundBridges.add(bridge);
}
public String getName() {
- if( name == null ) {
- name = "Connector:"+getNextId();
+ if (name == null) {
+ name = "Connector:" + getNextId();
}
return name;
}
-
+
static int nextId;
+
static private synchronized int getNextId() {
return nextId++;
}
@@ -303,4 +315,6 @@
public void setName(String name) {
this.name = name;
}
+
+ public abstract void restartProducerConnection() throws NamingException,
JMSException;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsQueueConnector.java
Fri Aug 25 04:01:30 2006
@@ -17,6 +17,9 @@
*/
package org.apache.activemq.network.jms;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -26,9 +29,6 @@
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
/**
* A Bridge to other JMS Queue providers
*
@@ -186,7 +186,11 @@
this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
}
-
+ public void restartProducerConnection() throws NamingException,
JMSException {
+ outboundQueueConnection = null;
+ initializeForeignQueueConnection();
+ }
+
protected void initializeForeignQueueConnection() throws
NamingException,JMSException{
if(outboundQueueConnection==null){
// get the connection factories
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/JmsTopicConnector.java
Fri Aug 25 04:01:30 2006
@@ -187,7 +187,11 @@
this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
}
-
+
+ public void restartProducerConnection() throws NamingException,
JMSException {
+ outboundTopicConnection = null;
+ initializeForeignTopicConnection();
+ }
protected void initializeForeignTopicConnection() throws
NamingException,JMSException{
if(outboundTopicConnection==null){
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/QueueBridge.java
Fri Aug 25 04:01:30 2006
@@ -56,12 +56,10 @@
}
}
-
protected MessageConsumer createConsumer() throws JMSException{
// set up the consumer
consumerSession=consumerConnection.createQueueSession(false,Session.CLIENT_ACKNOWLEDGE);
-
producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer=null;
if(selector!=null&&selector.length()>0){
@@ -74,6 +72,7 @@
}
protected MessageProducer createProducer() throws JMSException{
+
producerSession=producerConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createSender(null);
return producer;
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
URL:
http://svn.apache.org/viewvc/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java?rev=436752&r1=436751&r2=436752&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/jms/TopicBridge.java
Fri Aug 25 04:01:30 2006
@@ -60,7 +60,6 @@
protected MessageConsumer createConsumer() throws JMSException{
// set up the consumer
consumerSession=consumerConnection.createTopicSession(false,Session.CLIENT_ACKNOWLEDGE);
-
producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
MessageConsumer consumer=null;
if(consumerName!=null&&consumerName.length()>0){
if(selector!=null&&selector.length()>0){
@@ -81,6 +80,7 @@
protected MessageProducer createProducer() throws JMSException{
+
producerSession=producerConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
producer = producerSession.createPublisher(null);
return producer;
}