Markus Hahn created AMQ-4109:
--------------------------------
Summary: Negative queue counters
Key: AMQ-4109
URL: https://issues.apache.org/jira/browse/AMQ-4109
Project: ActiveMQ
Issue Type: Bug
Affects Versions: 5.7.0
Reporter: Markus Hahn
http://{server}:8161/admin/queues.jsp
I get negative numbers in the "Number Of Pending Messages". Running a simple
PTP scenario, but am purging the queue right in the middle of the action.
Below's the experiment. 5.7.0 was run out of the box on a CentOS 6.3 with Java
6.
_________________________________________________________________
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class PTPTest {
//static String _url = "failover://tcp://activemqtest:61616";
static String _url = ActiveMQConnection.DEFAULT_BROKER_URL;
public void enqueue() throws Exception {
Connection connection = null;
try
{
System.out.println("enqueuing...");
ConnectionFactory connectionFactory =
new ActiveMQConnectionFactory(_url);
connection = connectionFactory.createConnection();
connection.start();
Session s = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
//Session s = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination dest = s.createQueue("Q2");
MessageProducer mp = s.createProducer(dest);
mp.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
for(int num = 1; num <= 100;num++) {
String txt = "this is message #" + num;
TextMessage tmsg = s.createTextMessage(txt);
mp.send(tmsg);
Thread.sleep(10);
//System.out.printf(">>> %s\n", txt);
}
}
finally {
if (null != connection) {
connection.close();
}
}
}
public void dequeue(int id) throws Exception {
Connection connection = null;
try {
ConnectionFactory connectionFactory = new
ActiveMQConnectionFactory(_url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("Q2");
MessageConsumer consumer =
session.createConsumer(destination);
for(;;) {
Message msg = consumer.receive();
TextMessage tmsg = (TextMessage)msg;
String txt = tmsg.getText();
System.out.printf("<<< [%d] '%s', %s, %s\n",
id, txt,
tmsg.getJMSMessageID(),
tmsg.getJMSDestination());
Thread.sleep(1000);
}
}
finally {
connection.close();
}
}
public void exec() throws Exception {
Thread ethrd, dthrds[];
ethrd = new Thread() {
public void run() {
try {
enqueue();
}
catch (Exception e) {
e.printStackTrace();
}
}
};
ethrd.start();
Thread.sleep(1000);
final int D_COUNT = 4;
dthrds = new Thread[D_COUNT];
for (int i = 0; i < dthrds.length; i++) {
final int ii = i;
dthrds[i] = new Thread() {
public void run() {
try {
dequeue(ii);
}
catch (Exception e) {
e.printStackTrace();
}
}
};
}
for (Thread dthrd : dthrds) {
dthrd.start();
}
Thread.sleep(60000);
ethrd.interrupt();
ethrd.join();
for (Thread dthrd : dthrds) {
dthrd.interrupt();
dthrd.join();
}
}
public static void main(String[] args) throws Exception {
new PTPTest().exec();
System.out.println("\nDONE.");
}
}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira