[ 
https://issues.apache.org/activemq/browse/AMQ-1228?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#action_40117
 ] 

Sunny Liu commented on AMQ-1228:
--------------------------------

I have experience same problem with openJMS connection. However, I have fixed 
it in org.apache.activemq.network.jms.JmsQueueConnector and 
org.apache.activemq.network.jms.JmsTopicConnector. I have tested new change it 
work fine for me.

Here is code.

/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network.jms;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.Topic;
import javax.jms.TopicConnection;
import javax.jms.TopicConnectionFactory;
import javax.jms.TopicSession;
import javax.naming.NamingException;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

/**
 * A Bridge to other JMS Topic providers
 * 
 * @org.apache.xbean.XBean
 * 
 * @version $Revision: 1.1.1.1 $
 */
public class JmsTopicConnector extends JmsConnector
implements ExceptionListener
{
    private static final Log log=LogFactory.getLog(JmsTopicConnector.class);
    private String outboundTopicConnectionFactoryName;
    private String localConnectionFactoryName;
    private TopicConnectionFactory outboundTopicConnectionFactory;
    private TopicConnectionFactory localTopicConnectionFactory;
    private TopicConnection outboundTopicConnection;
    private TopicConnection localTopicConnection;
    private InboundTopicBridge[] inboundTopicBridges;
    private OutboundTopicBridge[] outboundTopicBridges;
    
    public boolean init(){
        boolean result=super.init();
        if(result){
            try{
                initializeForeignTopicConnection();
                initializeLocalTopicConnection();
                initializeInboundJmsMessageConvertor();
                initializeOutboundJmsMessageConvertor();
                initializeInboundTopicBridges();
                initializeOutboundTopicBridges();
            }catch(Exception e){
                log.error("Failed to initialize the JMSConnector",e);
            }
        }
        return result;
    }   
    

    protected boolean reInit()
    {
        boolean ret = false;
        try{            
                if(outboundTopicConnectionFactoryName!=null){
                this.outboundTopicConnection = null;
                this.outboundTopicConnectionFactory = null;
            }
                initializeForeignTopicConnection();
            initializeLocalTopicConnection();
            initializeInboundJmsMessageConvertor();
            initializeOutboundJmsMessageConvertor();
            initializeInboundTopicBridges();
            initializeOutboundTopicBridges();
            ret = true;
        }catch(Exception e){
            ret = false;
                log.error("Failed to initialize the JMSConnector",e);
        }
        return ret;
    }
    
    public void onException(JMSException jmsException)
    {
        if(started.get()) started.compareAndSet(true, false);
        boolean  initSuccess = false;
        do{
                initSuccess = reInit();
                        if(!initSuccess){
                                log.warn("Still not able to connect to foreign 
server, wait another 5 second and try again.");
                                try {                           
                                Thread.sleep(5000);
                                } catch (Exception e) {                         
        
                                        ;
                                }                               
                        }else{                          
                                log.warn("reconnect to foreign server 
successfully.");  
                                try {
                                        this.start();
                                } catch (Exception e) {
                                        initSuccess = false;
                                        log.warn("Failed to restart.", e);
                                }
                        }
                }while(!initSuccess);
                
        }
    
    /**
     * @return Returns the inboundTopicBridges.
     */
    public InboundTopicBridge[] getInboundTopicBridges(){
        return inboundTopicBridges;
    }

    /**
     * @param inboundTopicBridges
     *            The inboundTopicBridges to set.
     */
    public void setInboundTopicBridges(InboundTopicBridge[] 
inboundTopicBridges){
        this.inboundTopicBridges=inboundTopicBridges;
    }

    /**
     * @return Returns the outboundTopicBridges.
     */
    public OutboundTopicBridge[] getOutboundTopicBridges(){
        return outboundTopicBridges;
    }

    /**
     * @param outboundTopicBridges
     *            The outboundTopicBridges to set.
     */
    public void setOutboundTopicBridges(OutboundTopicBridge[] 
outboundTopicBridges){
        this.outboundTopicBridges=outboundTopicBridges;
    }

    /**
     * @return Returns the localTopicConnectionFactory.
     */
    public TopicConnectionFactory getLocalTopicConnectionFactory(){
        return localTopicConnectionFactory;
    }

    /**
     * @param localTopicConnectionFactory
     *            The localTopicConnectionFactory to set.
     */
    public void setLocalTopicConnectionFactory(TopicConnectionFactory 
localConnectionFactory){
        this.localTopicConnectionFactory=localConnectionFactory;
    }

    /**
     * @return Returns the outboundTopicConnectionFactory.
     */
    public TopicConnectionFactory getOutboundTopicConnectionFactory(){
        return outboundTopicConnectionFactory;
    }

    /**
     * @return Returns the outboundTopicConnectionFactoryName.
     */
    public String getOutboundTopicConnectionFactoryName(){
        return outboundTopicConnectionFactoryName;
    }

    /**
     * @param outboundTopicConnectionFactoryName
     *            The outboundTopicConnectionFactoryName to set.
     */
    public void setOutboundTopicConnectionFactoryName(String 
foreignTopicConnectionFactoryName){
        
this.outboundTopicConnectionFactoryName=foreignTopicConnectionFactoryName;
    }

    /**
     * @return Returns the localConnectionFactoryName.
     */
    public String getLocalConnectionFactoryName(){
        return localConnectionFactoryName;
    }

    /**
     * @param localConnectionFactoryName
     *            The localConnectionFactoryName to set.
     */
    public void setLocalConnectionFactoryName(String 
localConnectionFactoryName){
        this.localConnectionFactoryName=localConnectionFactoryName;
    }

    /**
     * @return Returns the localTopicConnection.
     */
    public TopicConnection getLocalTopicConnection(){
        return localTopicConnection;
    }

    /**
     * @param localTopicConnection
     *            The localTopicConnection to set.
     */
    public void setLocalTopicConnection(TopicConnection localTopicConnection){
        this.localTopicConnection=localTopicConnection;
    }

    /**
     * @return Returns the outboundTopicConnection.
     */
    public TopicConnection getOutboundTopicConnection(){
        return outboundTopicConnection;
    }

    /**
     * @param outboundTopicConnection
     *            The outboundTopicConnection to set.
     */
    public void setOutboundTopicConnection(TopicConnection 
foreignTopicConnection){
        this.outboundTopicConnection=foreignTopicConnection;
    }

    /**
     * @param outboundTopicConnectionFactory
     *            The outboundTopicConnectionFactory to set.
     */
    public void setOutboundTopicConnectionFactory(TopicConnectionFactory 
foreignTopicConnectionFactory){
        this.outboundTopicConnectionFactory=foreignTopicConnectionFactory;
    }


    public void restartProducerConnection() throws NamingException, 
JMSException {
        outboundTopicConnection = null;
        initializeForeignTopicConnection();
    }

    protected void initializeForeignTopicConnection() throws 
NamingException,JMSException{
        if(outboundTopicConnection==null){
            // get the connection factories
            if(outboundTopicConnectionFactory==null){
                // look it up from JNDI
                if(outboundTopicConnectionFactoryName!=null){
                    outboundTopicConnectionFactory=(TopicConnectionFactory) 
jndiOutboundTemplate.lookup(
                                    
outboundTopicConnectionFactoryName,TopicConnectionFactory.class);
                    if(outboundUsername!=null){
                        
outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
                                        outboundPassword);
                    }else{
                        
outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
                    }
                }else {
                    throw new JMSException("Cannot create localConnection - no 
information");
                }
            }else {
                if(outboundUsername!=null){
                    
outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection(outboundUsername,
                                    outboundPassword);
                }else{
                    
outboundTopicConnection=outboundTopicConnectionFactory.createTopicConnection();
                }
            }
        }
        outboundTopicConnection.start();
    }

    protected void initializeLocalTopicConnection() throws 
NamingException,JMSException{
        if(localTopicConnection==null){
            // get the connection factories
            if(localTopicConnectionFactory==null){
                if(embeddedConnectionFactory==null){
                    // look it up from JNDI
                    if(localConnectionFactoryName!=null){
                        localTopicConnectionFactory=(TopicConnectionFactory) 
jndiLocalTemplate.lookup(
                                        
localConnectionFactoryName,TopicConnectionFactory.class);
                        if(localUsername!=null){
                            
localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
                                            localPassword);
                        }else{
                            
localTopicConnection=localTopicConnectionFactory.createTopicConnection();
                        }
                    }else {
                        throw new JMSException("Cannot create localConnection - 
no information");
                    }
                }else{
                    localTopicConnection = 
embeddedConnectionFactory.createTopicConnection();
                }
            }else {
                if(localUsername!=null){
                    
localTopicConnection=localTopicConnectionFactory.createTopicConnection(localUsername,
                                    localPassword);
                }else{
                    
localTopicConnection=localTopicConnectionFactory.createTopicConnection();
                }
            }
        }
        localTopicConnection.start();
    }
    
    protected void initializeInboundJmsMessageConvertor(){
        inboundMessageConvertor.setConnection(localTopicConnection);
    }
    
    protected void initializeOutboundJmsMessageConvertor(){
        outboundMessageConvertor.setConnection(outboundTopicConnection);
    }

    protected void initializeInboundTopicBridges() throws JMSException{
        if(inboundTopicBridges!=null){
            TopicSession outboundSession = 
outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            TopicSession localSession = 
localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<inboundTopicBridges.length;i++){
                InboundTopicBridge bridge=inboundTopicBridges[i];
                String localTopicName=bridge.getLocalTopicName();          
                Topic 
activemqTopic=createActiveMQTopic(localSession,localTopicName);
                String topicName=bridge.getInboundTopicName();
                Topic 
foreignTopic=createForeignTopic(outboundSession,topicName);
                bridge.setConsumerTopic(foreignTopic);
                bridge.setProducerTopic(activemqTopic);
                bridge.setProducerConnection(localTopicConnection);
                bridge.setConsumerConnection(outboundTopicConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addInboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }

    protected void initializeOutboundTopicBridges() throws JMSException{
        if(outboundTopicBridges!=null){
            TopicSession outboundSession = 
outboundTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            TopicSession localSession = 
localTopicConnection.createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<outboundTopicBridges.length;i++){
                OutboundTopicBridge bridge=outboundTopicBridges[i];
                String localTopicName=bridge.getLocalTopicName();
                Topic 
activemqTopic=createActiveMQTopic(localSession,localTopicName);
                String topicName=bridge.getOutboundTopicName();
                Topic 
foreignTopic=createForeignTopic(outboundSession,topicName);
                bridge.setConsumerTopic(activemqTopic);
                bridge.setProducerTopic(foreignTopic);
                bridge.setProducerConnection(outboundTopicConnection);
                bridge.setConsumerConnection(localTopicConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addOutboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }
    
    protected  Destination createReplyToBridge(Destination destination, 
Connection replyToProducerConnection, Connection replyToConsumerConnection){
        Topic replyToProducerTopic =(Topic)destination;
        boolean isInbound = 
replyToProducerConnection.equals(localTopicConnection);
        
        if(isInbound){
                InboundTopicBridge bridge = (InboundTopicBridge) 
replyToBridges.get(replyToProducerTopic);
                if (bridge == null){
                        bridge = new InboundTopicBridge(){
                                protected Destination processReplyToDestination 
(Destination destination){
                                        return null;
                                }
                        };
                        try{
                                TopicSession replyToConsumerSession = 
((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
                                Topic replyToConsumerTopic = 
replyToConsumerSession.createTemporaryTopic();
                                replyToConsumerSession.close();
                                bridge.setConsumerTopic(replyToConsumerTopic);
                                bridge.setProducerTopic(replyToProducerTopic);
                                
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
                                
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
                                bridge.setDoHandleReplyTo(false);
                                if(bridge.getJmsMessageConvertor()==null){
                                        
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                                }
                                bridge.setJmsConnector(this);
                                bridge.start();
                                log.info("Created replyTo bridge for " + 
replyToProducerTopic);
                        }catch(Exception e){
                                log.error("Failed to create replyTo bridge for 
topic: " + replyToProducerTopic, e);
                                return null;
                        }
                        replyToBridges.put(replyToProducerTopic, bridge);
                }
                return bridge.getConsumerTopic();
        }else{
                OutboundTopicBridge bridge = (OutboundTopicBridge) 
replyToBridges.get(replyToProducerTopic);
                if (bridge == null){
                        bridge = new OutboundTopicBridge(){
                                protected Destination processReplyToDestination 
(Destination destination){
                                        return null;
                                }
                        };
                        try{
                                TopicSession replyToConsumerSession = 
((TopicConnection)replyToConsumerConnection).createTopicSession(false,Session.AUTO_ACKNOWLEDGE);
                                Topic replyToConsumerTopic = 
replyToConsumerSession.createTemporaryTopic();
                                replyToConsumerSession.close();
                                bridge.setConsumerTopic(replyToConsumerTopic);
                                bridge.setProducerTopic(replyToProducerTopic);
                                
bridge.setProducerConnection((TopicConnection)replyToProducerConnection);
                                
bridge.setConsumerConnection((TopicConnection)replyToConsumerConnection);
                                bridge.setDoHandleReplyTo(false);
                                if(bridge.getJmsMessageConvertor()==null){
                                        
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                                }
                                bridge.setJmsConnector(this);
                                bridge.start();
                                log.info("Created replyTo bridge for " + 
replyToProducerTopic);
                        }catch(Exception e){
                                log.error("Failed to create replyTo bridge for 
topic: " + replyToProducerTopic, e);
                                return null;
                        }
                        replyToBridges.put(replyToProducerTopic, bridge);
                }
                return bridge.getConsumerTopic();
        }               
    }
    
    protected Topic createActiveMQTopic(TopicSession session,String topicName) 
throws JMSException{
        return session.createTopic(topicName);
    }
    
    protected Topic createForeignTopic(TopicSession session,String topicName) 
throws JMSException{
        Topic result = null;
        try{
            result = session.createTopic(topicName);
        }catch(JMSException e){
            //look-up the Topic
            try{
                result = (Topic) jndiOutboundTemplate.lookup(topicName, 
Topic.class);
            }catch(NamingException e1){
                String errStr = "Failed to look-up Topic for name: " + 
topicName;
                log.error(errStr,e);
                JMSException jmsEx =  new JMSException(errStr);
                jmsEx.setLinkedException(e1);
                throw jmsEx;
            }
        }
        return result;
    }

    
}


///======================================================


/**
 *
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.activemq.network.jms;

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.NamingException;
/**
 * A Bridge to other JMS Queue providers
 * 
 * @org.apache.xbean.XBean
 *
 * @version $Revision: 1.1.1.1 $
 */
public class JmsQueueConnector extends JmsConnector
implements ExceptionListener
{
    private static final Log log=LogFactory.getLog(JmsQueueConnector.class);
    private String outboundQueueConnectionFactoryName;
    private String localConnectionFactoryName;
    
    private QueueConnectionFactory outboundQueueConnectionFactory;
    private QueueConnectionFactory localQueueConnectionFactory;
    
    private QueueConnection outboundQueueConnection;
    private QueueConnection localQueueConnection;
    
    private InboundQueueBridge[] inboundQueueBridges;
    private OutboundQueueBridge[] outboundQueueBridges;   

    public boolean init(){
        boolean result=super.init();
        if(result){
            try{
                initializeForeignQueueConnection();
                initializeLocalQueueConnection();
                initializeInboundJmsMessageConvertor();
                initializeOutboundJmsMessageConvertor();
                initializeInboundQueueBridges();
                initializeOutboundQueueBridges();
            }catch(Exception e){                
                log.error("Failed to initialize the JMSConnector",e);
            }
        }
        return result;
    }   
    
    protected boolean reInit()
    {
        boolean ret = false;
        try{            
                if(outboundQueueConnectionFactoryName!=null){
                this.outboundQueueConnection = null;
                this.outboundQueueConnectionFactory = null;
            }
            initializeForeignQueueConnection();
            initializeLocalQueueConnection();
            initializeInboundJmsMessageConvertor();
            initializeOutboundJmsMessageConvertor();
            initializeInboundQueueBridges();
            initializeOutboundQueueBridges();
            ret = true;
        }catch(Exception e){
            ret = false;
                log.error("Failed to initialize the JMSConnector",e);
        }
        return ret;
    }
    
    public void onException(JMSException jmsException)
    {
        if(started.get()) started.compareAndSet(true, false);
        boolean  initSuccess = false;
        do{
                initSuccess = reInit();
                        if(!initSuccess){
                                log.warn("Still not able to connect to foreign 
server, wait another 5 second and try again.");
                                try {                           
                                Thread.sleep(5000);
                                } catch (Exception e) {                         
        
                                        ;
                                }                               
                        }else{                          
                                log.warn("reconnect to foreign server 
successfully.");  
                                try {
                                        this.start();
                                } catch (Exception e) {
                                        initSuccess = false;
                                        log.warn("Failed to restart.", e);
                                }
                        }
                }while(!initSuccess);
                
        }
    
    /**
     * @return Returns the inboundQueueBridges.
     */
    public InboundQueueBridge[] getInboundQueueBridges(){
        return inboundQueueBridges;
    }

    /**
     * @param inboundQueueBridges
     *            The inboundQueueBridges to set.
     */
    public void setInboundQueueBridges(InboundQueueBridge[] 
inboundQueueBridges){
        this.inboundQueueBridges=inboundQueueBridges;
    }

    /**
     * @return Returns the outboundQueueBridges.
     */
    public OutboundQueueBridge[] getOutboundQueueBridges(){
        return outboundQueueBridges;
    }

    /**
     * @param outboundQueueBridges
     *            The outboundQueueBridges to set.
     */
    public void setOutboundQueueBridges(OutboundQueueBridge[] 
outboundQueueBridges){
        this.outboundQueueBridges=outboundQueueBridges;
    }

    /**
     * @return Returns the localQueueConnectionFactory.
     */
    public QueueConnectionFactory getLocalQueueConnectionFactory(){
        return localQueueConnectionFactory;
    }

    /**
     * @param localQueueConnectionFactory
     *            The localQueueConnectionFactory to set.
     */
    public void setLocalQueueConnectionFactory(QueueConnectionFactory 
localConnectionFactory){
        this.localQueueConnectionFactory=localConnectionFactory;
    }

    /**
     * @return Returns the outboundQueueConnectionFactory.
     */
    public QueueConnectionFactory getOutboundQueueConnectionFactory(){
        return outboundQueueConnectionFactory;
    }

    /**
     * @return Returns the outboundQueueConnectionFactoryName.
     */
    public String getOutboundQueueConnectionFactoryName(){
        return outboundQueueConnectionFactoryName;
    }

    /**
     * @param outboundQueueConnectionFactoryName
     *            The outboundQueueConnectionFactoryName to set.
     */
    public void setOutboundQueueConnectionFactoryName(String 
foreignQueueConnectionFactoryName){
        
this.outboundQueueConnectionFactoryName=foreignQueueConnectionFactoryName;
    }

    /**
     * @return Returns the localConnectionFactoryName.
     */
    public String getLocalConnectionFactoryName(){
        return localConnectionFactoryName;
    }

    /**
     * @param localConnectionFactoryName
     *            The localConnectionFactoryName to set.
     */
    public void setLocalConnectionFactoryName(String 
localConnectionFactoryName){
        this.localConnectionFactoryName=localConnectionFactoryName;
    }

    /**
     * @return Returns the localQueueConnection.
     */
    public QueueConnection getLocalQueueConnection(){
        return localQueueConnection;
    }

    /**
     * @param localQueueConnection
     *            The localQueueConnection to set.
     */
    public void setLocalQueueConnection(QueueConnection localQueueConnection){
        this.localQueueConnection=localQueueConnection;
    }

    /**
     * @return Returns the outboundQueueConnection.
     */
    public QueueConnection getOutboundQueueConnection(){
        return outboundQueueConnection;
    }

    /**
     * @param outboundQueueConnection
     *            The outboundQueueConnection to set.
     */
    public void setOutboundQueueConnection(QueueConnection 
foreignQueueConnection){
        this.outboundQueueConnection=foreignQueueConnection;
    }

    /**
     * @param outboundQueueConnectionFactory
     *            The outboundQueueConnectionFactory to set.
     */
    public void setOutboundQueueConnectionFactory(QueueConnectionFactory 
foreignQueueConnectionFactory){
        this.outboundQueueConnectionFactory=foreignQueueConnectionFactory;
    }

    public void restartProducerConnection() throws NamingException, 
JMSException {
        outboundQueueConnection = null;
        initializeForeignQueueConnection();
        
    }

    protected void initializeForeignQueueConnection() throws 
NamingException,JMSException{
        
        if(outboundQueueConnection==null){
            // get the connection factories
            if(outboundQueueConnectionFactory==null){
                // look it up from JNDI
                if(outboundQueueConnectionFactoryName!=null){
                    outboundQueueConnectionFactory=(QueueConnectionFactory) 
jndiOutboundTemplate.lookup(
                                    
outboundQueueConnectionFactoryName,QueueConnectionFactory.class);
                    if(outboundUsername!=null){
                        
outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
                                        outboundPassword);
                    }else{
                        
outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
                    }
                }else {
                    throw new JMSException("Cannot create localConnection - no 
information");
                }
            }else {
                if(outboundUsername!=null){
                    
outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection(outboundUsername,
                                    outboundPassword);
                }else{
                    
outboundQueueConnection=outboundQueueConnectionFactory.createQueueConnection();
                }
            }
        }
        outboundQueueConnection.start();
        outboundQueueConnection.setExceptionListener(this);
    }

    protected void initializeLocalQueueConnection() throws 
NamingException,JMSException{
        if(localQueueConnection==null){
            // get the connection factories
            if(localQueueConnectionFactory==null){
                if(embeddedConnectionFactory==null){
                    // look it up from JNDI
                    if(localConnectionFactoryName!=null){
                        localQueueConnectionFactory=(QueueConnectionFactory) 
jndiLocalTemplate.lookup(
                                        
localConnectionFactoryName,QueueConnectionFactory.class);
                        if(localUsername!=null){
                            
localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
                                            localPassword);
                        }else{
                            
localQueueConnection=localQueueConnectionFactory.createQueueConnection();
                        }
                    }else {
                        throw new JMSException("Cannot create localConnection - 
no information");
                    }
                }else{
                    localQueueConnection = 
embeddedConnectionFactory.createQueueConnection();
                }
            }else {
                if(localUsername!=null){
                    
localQueueConnection=localQueueConnectionFactory.createQueueConnection(localUsername,
                                    localPassword);
                }else{
                    
localQueueConnection=localQueueConnectionFactory.createQueueConnection();
                }
            }
        }
        localQueueConnection.start();
    }
    
    protected void initializeInboundJmsMessageConvertor(){
        inboundMessageConvertor.setConnection(localQueueConnection);
    }
    
    protected void initializeOutboundJmsMessageConvertor(){
        outboundMessageConvertor.setConnection(outboundQueueConnection);
    }

    protected void initializeInboundQueueBridges() throws JMSException{
        if(inboundQueueBridges!=null){
            QueueSession outboundSession = 
outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            QueueSession localSession = 
localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<inboundQueueBridges.length;i++){
                InboundQueueBridge bridge=inboundQueueBridges[i];
                String localQueueName=bridge.getLocalQueueName();
                Queue 
activemqQueue=createActiveMQQueue(localSession,localQueueName);
                                
                String queueName = bridge.getInboundQueueName();
                Queue 
foreignQueue=createForeignQueue(outboundSession,queueName);
                bridge.setConsumerQueue(foreignQueue);
                bridge.setProducerQueue(activemqQueue);
                bridge.setProducerConnection(localQueueConnection);
                bridge.setConsumerConnection(outboundQueueConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addInboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }

    protected void initializeOutboundQueueBridges() throws JMSException{
        if(outboundQueueBridges!=null){
            QueueSession outboundSession = 
outboundQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            QueueSession localSession = 
localQueueConnection.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
            for(int i=0;i<outboundQueueBridges.length;i++){
                OutboundQueueBridge bridge=outboundQueueBridges[i];
                String localQueueName=bridge.getLocalQueueName();              
                Queue 
activemqQueue=createActiveMQQueue(localSession,localQueueName);
                String queueName=bridge.getOutboundQueueName();
                Queue 
foreignQueue=createForeignQueue(outboundSession,queueName);
                bridge.setConsumerQueue(activemqQueue);
                bridge.setProducerQueue(foreignQueue);
                bridge.setProducerConnection(outboundQueueConnection);
                bridge.setConsumerConnection(localQueueConnection);
                if(bridge.getJmsMessageConvertor()==null){
                    
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                }
                bridge.setJmsConnector(this);
                addOutboundBridge(bridge);
            }
            outboundSession.close();
            localSession.close();
        }
    }
    
    protected Destination createReplyToBridge(Destination destination, 
Connection replyToProducerConnection, Connection replyToConsumerConnection){    
    
        Queue replyToProducerQueue =(Queue)destination;
        boolean isInbound = 
replyToProducerConnection.equals(localQueueConnection);
        
        if(isInbound){
                InboundQueueBridge bridge = (InboundQueueBridge) 
replyToBridges.get(replyToProducerQueue);
                if (bridge == null){
                        bridge = new InboundQueueBridge(){
                                protected Destination processReplyToDestination 
(Destination destination){
                                        return null;
                                }
                        };
                        try{
                                QueueSession replyToConsumerSession = 
((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
                                Queue replyToConsumerQueue = 
replyToConsumerSession.createTemporaryQueue();
                                replyToConsumerSession.close();
                                bridge.setConsumerQueue(replyToConsumerQueue);
                                bridge.setProducerQueue(replyToProducerQueue);
                                
bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
                                
bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
                                bridge.setDoHandleReplyTo(false);
                                if(bridge.getJmsMessageConvertor()==null){
                                        
bridge.setJmsMessageConvertor(getInboundMessageConvertor());
                                }
                                bridge.setJmsConnector(this);
                                bridge.start();
                                log.info("Created replyTo bridge for " + 
replyToProducerQueue);
                        }catch(Exception e){
                                log.error("Failed to create replyTo bridge for 
queue: " + replyToProducerQueue, e);
                                return null;
                        }
                        replyToBridges.put(replyToProducerQueue, bridge);
                }
                return bridge.getConsumerQueue();
        }else{
                OutboundQueueBridge bridge = (OutboundQueueBridge) 
replyToBridges.get(replyToProducerQueue);
                if (bridge == null){
                        bridge = new OutboundQueueBridge(){
                                protected Destination processReplyToDestination 
(Destination destination){
                                        return null;
                                }
                        };
                        try{
                                QueueSession replyToConsumerSession = 
((QueueConnection)replyToConsumerConnection).createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
                                Queue replyToConsumerQueue = 
replyToConsumerSession.createTemporaryQueue();
                                replyToConsumerSession.close();
                                bridge.setConsumerQueue(replyToConsumerQueue);
                                bridge.setProducerQueue(replyToProducerQueue);
                                
bridge.setProducerConnection((QueueConnection)replyToProducerConnection);
                                
bridge.setConsumerConnection((QueueConnection)replyToConsumerConnection);
                                bridge.setDoHandleReplyTo(false);
                                if(bridge.getJmsMessageConvertor()==null){
                                        
bridge.setJmsMessageConvertor(getOutboundMessageConvertor());
                                }
                                bridge.setJmsConnector(this);
                                bridge.start();
                                log.info("Created replyTo bridge for " + 
replyToProducerQueue);
                        }catch(Exception e){
                                log.error("Failed to create replyTo bridge for 
queue: " + replyToProducerQueue, e);
                                return null;
                        }
                        replyToBridges.put(replyToProducerQueue, bridge);
                }
                return bridge.getConsumerQueue();
        }               
    }
    
    protected Queue createActiveMQQueue(QueueSession session,String queueName) 
throws JMSException{
        return session.createQueue(queueName);
    }
    
    protected Queue createForeignQueue(QueueSession session,String queueName) 
throws JMSException{
        Queue result = null;
        try{
            result = session.createQueue(queueName);
        }catch(JMSException e){
            //look-up the Queue
            try{
                result = (Queue) jndiOutboundTemplate.lookup(queueName, 
Queue.class);
            }catch(NamingException e1){
                String errStr = "Failed to look-up Queue for name: " + 
queueName;
                log.error(errStr,e);
                JMSException jmsEx =  new JMSException(errStr);
                jmsEx.setLinkedException(e1);
                throw jmsEx;
            }
        }
        return result;
    }

    
}

> CLONE -JMS to JMS Bridge never reconnects under remote broker restarts and 
> connections are not closed.
> ------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-1228
>                 URL: https://issues.apache.org/activemq/browse/AMQ-1228
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.0 RC2, 4.0.1
>            Reporter: William MacDonald
>
> I'm using ActiveMQ (4.0.1) JMS to JMS Bridge functionality to connect to a  
> SunMQ JMS Broker (3.6 SP3  (Build 02-A)). I'm using two queues, an input and 
> an output one, with the following configuration:
>     <jmsBridgeConnectors>
>       <jmsQueueConnector outboundQueueConnectionFactory="#REMOTE">
>       <outboundQueueBridges>
>         <outboundQueueBridge outboundQueueName="SUNRECV"/>
>       </outboundQueueBridges>
>       <inboundQueueBridges>
>         <inboundQueueBridge inboundQueueName="SUNSEND"/>
>       </inboundQueueBridges>
>       </jmsQueueConnector>
>     </jmsBridgeConnectors>
> The system works really well until the SunMQ broker needed to be restarted. 
> This is what I found:
> 1.-ActiveMQ is not aware of the remote broker shutdown. I waited for a while, 
> but no log on ActiveMQ indicates knowledge about the new situation.
> 2.-When I send a message to the output queue SUNRECV, ActiveMQ complains that 
> the producer is closed:
> [ERROR][2006/08/25.09:47:12.039][ActiveMQ Session Task]failed to forward 
> message: ActiveMQTextMessage {commandId = 5, responseRequired = false, 
> messageId = ID:trabucco-43457-1156491843149-3:4:1:1:1, originalDestination = 
> null, originalTransactionId = null, producerId = 
> ID:trabucco-43457-1156491843149-3:4:1:1, destination = queue://SUNRECV, 
> transactionId = null, expiration = 0, timestamp = 1156492032027, arrival = 0, 
> correlationId = null, replyTo = null, persistent = false, type = null, 
> priority = 0, groupID = null, groupSequence = 0, targetConsumerId = null, 
> compressed = false, userID = null, content = null, marshalledProperties = 
> null, dataStructure = null, redeliveryCounter = 0, size = 2, properties = 
> null, readOnlyProperties = true, readOnlyBody = true, text = 1}([C4064]: 
> Cannot perform operation, producer is closed.)
>  After this, it is automatically queueing messages without sending them, 
> showing the log:
> [DEBUG][2006/08/25.09:47:42.721][RMI TCP Connection(4)-10.95.89.20]No 
> subscriptions registered, will not dispatch message at this time.
>  Even if SunMQ is started again, ActiveMQ is not detecting the new situation, 
> and continues queueing messages sent to SUNRECV.
> Please, make me know if more information is needed to understand the 
> situation.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to