Hi James,

well, it doesn't depend on the prefetch value (I have tried with 1000
and 2000 prefetch messages, and without prefetching at all), and what
happens is this:
1) create about 60 producers and 60 consumers (I have attached the
small and ugly test I used - basically I've forked about 60 processes
on each side, which means they had separate sessions and connections)
2) if you cut out the shutdown hook in this test, which handles
cleanup of connections and sessions, and you kill large amount of
producers/consumers this queue will be blocked in the terms that
neither producers or consumers will be able to either send or receive
messages on this queue - sometimes this happens for a period of a
20-30s and very often for good (>10h, that's the longest period i've
left it before i've restarted broker).
3) in case there is shutdown hook with properly closing sessions and
connections, this is very hard to repeat, and in case of 4.0.0 i've
managed to block the queue with repetitve starting/killing large
number of producers and consumers, but i've never managed to reproduce
this with 4.0.1
4) i could see a large number of hanging connections (jmx console
showed much more connections than there actually were)
4) after restarting broker all works like a charm

Thanks,
Igor

On 7/3/06, James Strachan <[EMAIL PROTECTED]> wrote:
By 'blocking the queue' do you just mean messages were not dispatched
to consumers? If so how many messages were on the queue, how many
connections & consumers did the broker think were open & what were the
prefetch values? If you are having trouble of dead connections not
being closed down agressively enough then your issue could just be an
effect of messages being placed into prefetch buffers for clients that
have not been detected as failed yet? Do you have any kind of JUnit
test case that can replicate your issue?

On 7/3/06, Igor Bogicevic <[EMAIL PROTECTED]> wrote:
> I am using activemq 4.0.1 (I've did tests on the both 4.0.0 and 4.0.1)
> and I have attached activemq.xml (basically, almost out of the box
> configuration). I am using persistent queues, and I have tried with
> prefetching and transactions, but that didn't really change the
> behavior.
>
> Thanks,
> Igor
>
> On 7/3/06, James Strachan <[EMAIL PROTECTED]> wrote:
> > On 6/30/06, Igor Bogicevic <[EMAIL PROTECTED]> wrote:
> > > I also forgot to answer on this thread... this was not the problem
> > > related to the lack of the memory, it's related to hanging
> > > connections, i've set up the broker with the large amount of memory
> > > (>2GB) in both broker configuration and via jconsole and problem was
> > > defintely in the connections which were not cleaned up - for unknown
> > > reason to me. it was easy to block queue with large number of
> > > un-properly closed sessions and relatively hard with properly closed
> > > connections in a shutdown hook.
> >
> > Using what version and what quality of service? e.g. are we talking
> > 4.0.1 and non-persistent queues? What was your XML config file?
> >
> > --
> >
> > James
> > -------
> > http://radio.weblogs.com/0112098/
> >
>
>
>


--

James
-------
http://radio.weblogs.com/0112098/

package test;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.util.IndentPrinter;

import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.DeliveryMode;
import javax.jms.Session;
import javax.jms.ExceptionListener;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.TextMessage;
import javax.jms.Message;
import javax.jms.Topic;

import java.io.IOException;
import java.util.Random;
import java.util.Date;

// ActivemqSafeJMSTest class

public class ActivemqSafeJMSTest implements MessageListener, ExceptionListener 
{
  protected Destination destination;
  protected String subject = "";
  protected String url = "";
  // properties
  protected boolean topic = false;
  protected boolean transacted = false;
  protected boolean durable = false;
  protected boolean persistent = false;
  protected boolean verbose = false;
  private long sleepTime=0L;
  protected int ackMode = Session.CLIENT_ACKNOWLEDGE;
  // identification
  protected String user = ActiveMQConnection.DEFAULT_USER;
  protected String pwd = ActiveMQConnection.DEFAULT_PASSWORD;
  protected String clientID = "";
  protected String consumerName = "";

  // consumer
  protected int count = 0;
  protected int dumpCount = 20000;
  protected int maxiumMessages = 0;
  private boolean pauseBeforeShutdown;
  private boolean running;
  private Session globSession;
  private Connection globConnection;
  private long receiveTimeOut=0;

  // producer
  protected int messageSize = 1000;
  protected int messageCount = 1000;
  protected long timeToLive = 0L;

  public ActivemqSafeJMSTest() {
    Runtime.getRuntime().addShutdownHook(new Thread() {  
      public void run(){
        try {
          if(globSession != null) 
            globSession.close();
          if(globConnection != null)
            globConnection.close(); 

          System.out.println("Shuting down nicely.");
        }
        catch (JMSException e) {
          System.out.println("Failed to shutdown nicely.");
        }
      }
    });

  }

  public static void main(String[] args) {
    int count = 0;
    String mode = "";
    ActivemqSafeJMSTest newTest = new ActivemqSafeJMSTest();

    for (int i = 0; i < args.length; i++) {  
      // properties
      if (args[i].matches("--durable")) {
        newTest.durable = true;
      }
      else if (args[i].matches("--transacted")) {
        newTest.transacted = true;
      }
      else if (args[i].matches("--persistent")) {
        newTest.persistent = true;
      }
      else if (args[i].matches("--topic")) {
        newTest.topic = true;
      }
      else if (args[i].matches("--verbose")) {
        newTest.verbose = true;
      }
      // values
      else if (args[i].matches("--clientid")) {
        if (i < args.length - 1) {
          newTest.clientID = args[++i];
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--consumername")) {
        if (i < args.length - 1) {
          newTest.consumerName = args[++i];
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--subject")) {
        if (i < args.length - 1) {
          newTest.subject = args[++i];
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--url")) {
        if (i < args.length - 1) {
          newTest.url = args[++i];
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--size")) {
        if (i < args.length - 1) {
          try {
            newTest.messageSize = Integer.parseInt(args[++i]);
          } catch (NumberFormatException nx) {
            System.out.println("bad integer...");
            System.exit(1);
          }
        }
        else {
          usage();
        }
      }
      else if (args[i].matches("--messages")) {
        if (i < args.length - 1) {
          try {
            newTest.messageCount = Integer.parseInt(args[++i]);
          } catch (NumberFormatException nx) {
            System.out.println("bad integer...");
            System.exit(1);
          }
        }
        else {
          usage();
        }
      }
      // mode
      else if (i == args.length - 1) {
        mode = args[i];
      }
      // f*ckup 
      else {
        usage();
      }
    }

    // randomize consumer name and client id if it's not being set - aka give mojo
    if (newTest.consumerName.matches("")) {
      Random randomizer = new Random();
      newTest.consumerName = Long.toString(Math.abs(randomizer.nextLong()), 36);
    }
    if (newTest.clientID.matches("")) {
      Random randomizer = new Random();
      newTest.clientID = Long.toString(Math.abs(randomizer.nextLong()), 36);
    }

    // run test
    if (mode.equals("consumer")) {
      newTest.consume();
    }
    else if (mode.equals("producer")) {
      newTest.produce();
    }
  }

  //                //
  // ** GENERIC  ** //
  //                //


  protected Session createSession(Connection connection) throws Exception {
    Session session = connection.createSession(transacted, ackMode);
    if (topic) {
      destination = session.createTopic(subject);
    }
    else {
      destination = session.createQueue(subject);
    }
    return session;
  }

  protected Connection createConnection() throws JMSException, Exception {
    ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, pwd, url);
    Connection connection = connectionFactory.createConnection();
    if (durable && clientID!=null) {
      connection.setClientID(clientID);
    }
    
    connection.start();
    return connection;
  }

  protected void close(Connection connection, Session session) throws JMSException {
    // lets dump the stats
    dumpStats(connection);

    if (session != null) {
      session.close();
    }
    if (connection != null) {
      connection.close();
    }
  }

  protected void dumpStats(Connection connection) {
    ActiveMQConnection c = (ActiveMQConnection) connection;
    c.getConnectionStats().dump(new IndentPrinter());
  }

  //                //
  // ** CONSUMER ** //
  //                //

  private void consume() {
    try {
      running = true;
            
      System.out.println("Connecting to URL: " + url);
      System.out.println("Consuming " + (topic ? "topic" : "queue") + ": " + subject);
      System.out.println("Using " + (durable ? "durable" : "non-durable") + " subscription");
      System.out.println("Using " + (persistent ? "persistent" : "non-persistent") + " queue");

      globConnection = createConnection();
      globConnection.setExceptionListener(this);
      globSession = createSession(globConnection);
      MessageConsumer consumer = null;
      if (durable && topic) {
        consumer = globSession.createDurableSubscriber((Topic) destination, consumerName);
      }
      else {
        consumer = globSession.createConsumer(destination);
      }
      if ( maxiumMessages > 0 ) {
        consumeMessagesAndClose(globConnection, globSession, consumer);
      } else  {
        if(receiveTimeOut==0) {
          consumer.setMessageListener(this);
        } else {
          consumeMessagesAndClose(globConnection, globSession, consumer, receiveTimeOut);
        }
      }
    }
    catch (Exception e) {
      System.out.println("Caught while starting: " + e);
      e.printStackTrace();
    } 
  }

  public void onMessage(Message message) {
    try {
      if (message instanceof TextMessage) {
        TextMessage txtMsg = (TextMessage) message;
        if (verbose) {

          String msg = txtMsg.getText();
          if (msg.length() > 50) {
            msg = msg.substring(0, 50) + "...";
          }

          System.out.println("Received: " + msg);
        }
      }
      else {
        if (verbose) {
          System.out.println("Received: " + message);
        }
      }
      if(transacted) {
        globSession.commit();
      }

      message.acknowledge();
    }
    catch (JMSException e) {
      System.out.println("Caught: " + e);
      e.printStackTrace();
    } finally {
      if( sleepTime> 0 ) {
        try {
          Thread.sleep(sleepTime);
        } catch (InterruptedException e) {
      
        }
      }
    }
  }

  protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer) throws JMSException, IOException {
    System.out.println("We are about to wait until we consume: " + maxiumMessages + " message(s) then we will shutdown");

    for (int i = 0; i < maxiumMessages && isRunning(); ) {
      Message message = consumer.receive(1000);
      if( message!=null ) {
        i++;
        onMessage(message);
      }
    }
    
    System.out.println("Closing connection");
    consumer.close();
    session.close();
    connection.close();

    consumer = null;
    session = null;
    connection = null;

    if (pauseBeforeShutdown) {
      System.out.println("Press return to shut down");
      System.in.read();
    }
  }
    
  protected void consumeMessagesAndClose(Connection connection, Session session, MessageConsumer consumer, long timeout) throws JMSException, IOException {
    System.out.println("We will consume messages while they continue to be delivered within: " + timeout + " ms, and then we will shutdown");

    Message message;
    while ( (message = consumer.receive(timeout)) != null ) {
      onMessage(message);
    }
        
    System.out.println("Closing connection");
    consumer.close();
    session.close();
    connection.close();

    consumer = null;
    session = null;
    connection = null;

    if (pauseBeforeShutdown) {
      System.out.println("Press return to shut down");
      System.in.read();
    }
 }


  synchronized public void onException(JMSException ex) {
    System.out.println("JMS Exception occured.  Shutting down client.");
    running=false;
  }

  synchronized boolean isRunning() {
    return running;
  }

  //                //
  // ** PRODUCER ** //
  //                //

  private void produce() {
    try {
      System.out.println("Connecting to URL: " + url);
      System.out.println("Publishing a Message with size " + messageSize + " to " + (topic ? "topic" : "queue") + ": " + subject);
      System.out.println("Using " + (durable ? "durable" : "non-durable") + " publishing");
  
      globConnection = createConnection();
      globSession = createSession(globConnection);
      MessageProducer producer = createProducer(globSession);
      sendLoop(globSession, producer);

      System.out.println("Done.");
      close(globConnection, globSession); 
    }
    catch (Exception e) {
      System.out.println("Caught: " + e);
      e.printStackTrace();
    }
  }

  protected void sendLoop(Session session, MessageProducer producer) throws Exception {
    for (int i = 0; i < messageCount || messageCount==0 ; i++) {
      TextMessage message = session.createTextMessage(createMessageText(i));

      if (verbose) {
        String msg = message.getText();
        if (msg.length() > 50) {
          msg = msg.substring(0, 50) + "...";
        }
        System.out.println("Sending message: " + msg);
      }
            
      producer.send(message);
      if(transacted) {
        session.commit();
      }
            
      Thread.sleep(sleepTime);
            
    }
  }

  protected MessageProducer createProducer(Session session) throws JMSException {
    MessageProducer producer = session.createProducer(destination);
    if ((durable && topic) || (persistent && !topic)) {
      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
    }
    else {
      producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    }
    if( timeToLive!=0 )
      producer.setTimeToLive(timeToLive);
  
    return producer;
  }


  private String createMessageText(int index) {
    StringBuffer buffer = new StringBuffer(messageSize);
    buffer.append("Message: " + index + " sent at: " + new Date());
    if (buffer.length() > messageSize) {
      return buffer.substring(0, messageSize);
    }
    for (int i = buffer.length(); i < messageSize; i++) {
      buffer.append(' ');
    }
    return buffer.toString();
  }

  //            //
  // ** UTIL ** //
  //            //

  private static void usage() {
    System.out.println("usage : java -cp testModules.jar test.ActivemqSafeJMSTest [options] mode");
    System.out.println("options:                      ");
    System.out.println("\t--durable                   ");
    System.out.println("\t--transacted                ");
    System.out.println("\t--persistent                ");
    System.out.println("\t--topic                     ");
    System.out.println("\t--clientid clientID         ");
    System.out.println("\t--consumername consumerName ");
    System.out.println("\t--subject subject           ");
    System.out.println("\t--size size                 ");
    System.out.println("\t--messages messageCount     ");
    System.out.println("\t--url URL                   ");
    
    System.exit(1);
  }
}

Reply via email to