Actually it didn't end up being the solution to the problem in my
application :(

However it did lead me to realise that the problem is related to synchronous
messages and specifically i think it may be the temporary queues used to do
the acknowledging.

I have hurriedly created a junit test case that causes my broker to grow in
size untill i get an error message on the client like this:

javax.jms.JMSException: The transport tcp://localhost/127.0.0.1:61616 of
type: org.apache.activemq.transport.tcp.TcpTransport is not running.
        at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
        at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1122)
        at
org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1200)
        at
org.apache.activemq.ActiveMQConnection.createSession(ActiveMQConnection.java:271)
        at
com.vieo.ccs.activemq.AMQSynchronousTest$1.run(AMQSynchronousTest.java:82)
Caused by: java.io.IOException: The transport
tcp://localhost/127.0.0.1:61616 of type:
org.apache.activemq.transport.tcp.TcpTransport is not running.
        at
org.apache.activemq.transport.TransportSupport.checkStarted(TransportSupport.java:108)
        at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:123)
        at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:141)
        at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:78)
        at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:77)
        at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
        at
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:68)
        at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:73)
        at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1112)

I was running the broker with its JVM memory size limited to 256M (even
though i saw it grow to 270??) to save a bit of time. When i ran it with
512M i once saw everything stop for a while then suddenly the brokers jvm
size dropped to abot 10M and i saw it log into the database again and then
everything started working again???

Here is the test case:



import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.ObjectMessage;
import javax.jms.TemporaryQueue;
import javax.jms.TextMessage;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.management.MBeanServer;
import javax.management.ObjectName;


import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.command.*;
import org.apache.log4j.Logger;

import com.vieo.ccs.common.messages.TestMessage;
import com.vieo.ccs.common.queue.AMQQueueConsumer;
import com.vieo.ccs.common.queue.CommunicationException;

import junit.framework.TestCase;

public class AMQSynchronousTest extends TestCase {
        static Logger logger = Logger.getLogger("AMQStressTest");

        
        
 public void testAMQSendReceive(){      
        
        
        final String connectionString = "tcp://localhost:61616";
         

    ActiveMQConnectionFactory amqFactory = null;
    
    Destination destination = null;
    long counter=0;
    
    try {
                
        // get amq connection factory
        amqFactory =  new ActiveMQConnectionFactory(connectionString);
    
        while(true){
            
                counter++;
                logger.info("iteration: "+counter);
                
                amqFactory =  new ActiveMQConnectionFactory(connectionString);
                
                ActiveMQConnection producerConnection ;
            
            
            Session producerSession ;
           
            MessageProducer producer;
                
            
            producerConnection =
(ActiveMQConnection)amqFactory.createConnection();
                producerConnection.setCopyMessageOnSend(false);
                producerSession = producerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                if (destination==null){
                        destination =  
producerSession.createQueue("MemoryLeakQueue");
                }
                producer = producerSession.createProducer(destination);
                producerConnection.start();
            
            
            Thread thread = new Thread() {
                                @Override
                                public void run() {
                                        try {
                                                ActiveMQConnectionFactory 
amqFactory2 =  new
ActiveMQConnectionFactory(connectionString);
                                                ActiveMQConnection 
consumerConnection =
(ActiveMQConnection)amqFactory2.createConnection();
                                        Session consumerSession = 
consumerConnection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
                                        
                                        // Not sure if this is part of the 
problem - normally a jndi lookup
                                        //ActiveMQQueue queue = new 
ActiveMQQueue("MemoryLeakQueue");
                                        Destination queue = 
consumerSession.createQueue("MemoryLeakQueue"); 
                                        
                                        
                                        MessageConsumer consumer = 
consumerSession.createConsumer(queue);    
                                        consumerConnection.start();
                                
                                                TextMessage msg = 
(TextMessage)consumer.receive(0);
                                                
                                                assertTrue( 
msg.getText().equals("Test Message") );
                                                
                                                                                
                
                                                Destination tempQueue = 
(Destination)msg.getJMSReplyTo();
                                                MessageProducer prod = 
consumerSession.createProducer(tempQueue);
                                                
                                                TextMessage reply = 
consumerSession.createTextMessage("Reply");
                                                
                                                prod.send(reply);
                                                
                                                tempQueue = null;
                                                prod.close();
                                                prod=null;
                                                consumerSession.close();
                                                consumerSession=null;
                                                consumerConnection.close();
                                                consumerConnection=null;
                                                
                                                amqFactory2= null;
                                                
                                        } catch (JMSException e) {
                                                e.printStackTrace();
                                                fail();
                                        }

                                        
                                }
                        };
                        thread.start();
           
                TemporaryQueue tempQueue = 
producerSession.createTemporaryQueue();
                        MessageConsumer tempConsumer = 
producerSession.createConsumer(tempQueue);
                        
                // Create and send the message
                        
                        TextMessage message= 
producerSession.createTextMessage("Test Message");
                message.setJMSReplyTo(tempQueue);
                producer.send(message);
                
                // Wait for the reply
                Message reply = tempConsumer.receive(0);
                // Delete the temporary consumer and queue
                
                try {
                                thread.join();
                        } catch (InterruptedException e) {
                                e.printStackTrace();
                        }
                
                
                tempConsumer.close();
                tempQueue.delete();
                
                producerConnection.stop();
                producer.close();
                        producer = null;
                        producerSession.close();
                        producerSession = null;
                        producerConnection.close();
                        producerConnection = null;
                        message = null;
                        amqFactory = null;
                        thread = null;
                        
                        System.gc();

        }
        
    } catch (JMSException e) {
        logger.error("JMS Exception occurred: " + e.getMessage());
        fail();
    
        } catch (Exception e){
                e.printStackTrace();
                fail();
        }
        
 }
        
}


I was a bit rushed so there may be a stupid error in there that is causing
the broker error. 

ps I was running this on a windows machine - not sure if this is relevent
but i did notice a lot of connections in a TIME_WAIT state when i did a
netstat while the test was running.


-- 
View this message in context: 
http://www.nabble.com/AMQ-4.0.1-Memory-Leak-Hang-tf1889180.html#a5197284
Sent from the ActiveMQ - User forum at Nabble.com.

Reply via email to