Markus Hahn created AMQ-4109:
--------------------------------

             Summary: Negative queue counters
                 Key: AMQ-4109
                 URL: https://issues.apache.org/jira/browse/AMQ-4109
             Project: ActiveMQ
          Issue Type: Bug
    Affects Versions: 5.7.0
            Reporter: Markus Hahn


http://{server}:8161/admin/queues.jsp

I get negative numbers in the "Number Of Pending Messages". Running a simple 
PTP scenario, but am purging the queue right in the middle of the action.

Below's the experiment. 5.7.0 was run out of the box on a CentOS 6.3 with Java 
6.
_________________________________________________________________

import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;


public class PTPTest {

        //static String _url = "failover://tcp://activemqtest:61616";
        static String _url = ActiveMQConnection.DEFAULT_BROKER_URL;
        
        public void enqueue() throws Exception {
                
                Connection connection = null;
                try 
                {
                        System.out.println("enqueuing...");
                        
                        ConnectionFactory connectionFactory =
                     new ActiveMQConnectionFactory(_url);
                        connection = connectionFactory.createConnection();
                connection.start();

                Session s = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
                //Session s = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);

                Destination dest = s.createQueue("Q2");

                MessageProducer mp = s.createProducer(dest);
                mp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);

                for(int num = 1; num <= 100;num++) {
                        String txt = "this is message #" + num;
                        TextMessage tmsg = s.createTextMessage(txt);
                        mp.send(tmsg);
                        Thread.sleep(10);
                        //System.out.printf(">>> %s\n", txt);   
                }
            }
                finally {
                 if (null != connection) {
                         connection.close();
                 }
                }
        }
        
        public void dequeue(int id) throws Exception {
                
                Connection connection = null;
                
                try {
                        ConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(_url);
                        connection = connectionFactory.createConnection();
                        connection.start();

                        Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);

                        Destination destination = session.createQueue("Q2");

                        MessageConsumer consumer = 
session.createConsumer(destination);

                        for(;;) {
                                Message msg = consumer.receive();
                                TextMessage tmsg = (TextMessage)msg;
                                String txt = tmsg.getText();
                                System.out.printf("<<< [%d] '%s', %s, %s\n", 
                                                id, txt, 
                                                tmsg.getJMSMessageID(), 
                                                tmsg.getJMSDestination());
                                Thread.sleep(1000);
                        }
                }
                finally {
                        connection.close();
                }
        }
        
        public void exec() throws Exception {
                Thread ethrd, dthrds[];

                ethrd = new Thread() {
                        public void run() {
                                try {
                                        enqueue();
                                }
                                catch (Exception e) {
                                        e.printStackTrace();
                                }
                        }
                };
                ethrd.start();
                Thread.sleep(1000);

                final int D_COUNT = 4;
                dthrds = new Thread[D_COUNT];
                for (int i = 0; i < dthrds.length; i++) {
                        final int ii = i;
                        dthrds[i] = new Thread() {
                                public void run() {
                                        try {
                                                dequeue(ii);
                                        }
                                        catch (Exception e) {
                                                e.printStackTrace();
                                        }
                                }
                        };
                }
                for (Thread dthrd : dthrds) {
                        dthrd.start();
                }
                
                Thread.sleep(60000);
                        
                ethrd.interrupt();
                ethrd.join();

                for (Thread dthrd : dthrds) {
                        dthrd.interrupt();
                        dthrd.join();
                } 
        }
        
        public static void main(String[] args) throws Exception {
                new PTPTest().exec();
                System.out.println("\nDONE.");
        }
}







--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to