I have enhanced my test harness again and have managed to reproduce the problem
of the listeners dying. I have made the harness emulate pretty much exactly
what we do in our application for receiving messages. Our application receives
messages from multiple queues, and we have a manager thread for each queue
which manages the process of connecting to the queue and setting up the right
number of message listeners. In my example, I have set it up so that the first
queue moves its messages to the second queue, the second queue to the third
queue and the third queue moves them back to the first queue. In this way, it
generates a constant flow of messages for the test.
When I ran this, after an hour and a half I get the following output in the
console:
| Checking core.internal.bulkUpload
| Checking core.internal.bulkUpload.error
| Checking billing.response
| Checking core.internal.publication
| Checking publish.request
| Checking billing.response.error
| Checking core.track.error
| Checking core.track
| ********** core.track.2 has stopped **********
| ********** core.track.3 has stopped **********
| ********** core.track.4 has stopped **********
|
After four hours, some more stop:
| Checking core.internal.bulkUpload
| Checking billing.response
| Checking core.track
| ********** core.track.2 has stopped **********
| ********** core.track.3 has stopped **********
| ********** core.track.4 has stopped **********
| Checking billing.response.error
| Checking core.internal.publication
| ********** core.internal.publication.4 has stopped **********
| ********** core.internal.publication.6 has stopped **********
| ********** core.internal.publication.7 has stopped **********
| ********** core.internal.publication.8 has stopped **********
| ********** core.internal.publication.9 has stopped **********
| Checking core.track.error
| Checking publish.request
|
If I go into the debugger in Eclipse and pause these threads, I get the
following stack trace:
| Thread [core.track.4] (Suspended)
| Object.wait(long) line: not available [native method]
| Object.wait() line: 429
| LinkedQueue.take() line: not available
| QueuedExecutor$RunLoop.run() line: not available
| Thread.run() line: 534
|
I've checked in the JMX console, and the core.track queue has over 9,000
messages waiting to be processed, so there is plenty for these threads to do.
The core.internal.publication queue had 280 messages, so there still should be
work for the listeners that have stopped.
The new test harness is below. I've created the same number of listeners for
all of our queues as we have in our application. These queues are configured
in JBoss as follows:
publish.request FullSize=10000 PageSize=2000 DownCacheSize=2000
core.track FullSize=10000 PageSize=2000 DownCacheSize=2000
core.internal.publication FullSize=10000 PageSize=2000 DownCacheSize=2000
core.track.error FullSize=10000 PageSize=2000 DownCacheSize=2000
billing.response FullSize=10000 PageSize=2000 DownCacheSize=2000
billing.response.error FullSize=2000 PageSize=200 DownCacheSize=200
core.internal.bulkUpload FullSize=100 PageSize=10 DownCacheSize=10
core.internal.bulkUpload.error FullSize=100 PageSize=10 DownCacheSize=10
A template of our queue configuration is as follows:
| <mbean code="org.jboss.jms.server.destination.Queue"
| name="jboss.messaging.destination:service=Queue,name=core.track"
| xmbean-dd="xmdesc/Queue-xmbean.xml">
| <depends
optional-attribute-name="ServerPeer">jboss.messaging:service=ServerPeer</depends>
| <attribute name="SecurityConfig">
| <security>
| <role name="guest" read="true" write="true"/>
| <role name="publisher" read="true" write="true" create="false"/>
| <role name="noacc" read="false" write="false" create="false"/>
| </security>
| </attribute>
| <attribute name="FullSize">10000</attribute>
| <attribute name="PageSize">2000</attribute>
| <attribute name="DownCacheSize">2000</attribute>
| </mbean>
|
To run the harness, I start with a new instance of JBoss Messaging and seed the
publish.request queue with lots (10,000+) of text messages. I then start the
harness which simply moves the messages between the publish.request,
core.internal.publication and core.track queues. I don't put any messages in
the other queues, as they don't have messages in them when our application
fails. I then let the harness run, until it displays the queue has stopped
messages.
| public class TestMultiSessionMessageListener {
|
| public static void main(String[] args) {
| TestMultiSessionMessageListener ml = new
TestMultiSessionMessageListener();
|
| try {
| ml.test();
| } catch (Exception e) {
| e.printStackTrace();
| }
| }
|
| private void test() throws Exception {
| boolean printMessages = false;
|
| createListeners("publish.request", "core.track", 5, 250, printMessages);
| Thread.sleep(2000);
| createListeners("core.track", "core.internal.publication", 5, 250,
printMessages);
| Thread.sleep(2000);
| createListeners("core.internal.publication", "publish.request", 10,
750, printMessages);
| Thread.sleep(2000);
| createListeners("core.track.error", "core.track", 2, 100,
printMessages);
| Thread.sleep(2000);
| createListeners("billing.response", "publish.request", 2, 100,
printMessages);
| Thread.sleep(2000);
| createListeners("billing.response.error", "billing.response", 2, 100,
printMessages);
| Thread.sleep(2000);
| createListeners("core.internal.bulkUpload", "publish.request", 3, 100,
printMessages);
| Thread.sleep(2000);
| createListeners("core.internal.bulkUpload.error",
| "core.internal.bulkUpload", 1, 100, printMessages);
| Thread.sleep(Long.MAX_VALUE);
| }
|
| private void createListeners(final String receiveQueue,
| final String sendQueue, final int numberOfProcesses,
| final long delay, final boolean printMessages) throws Exception {
|
| Runnable runnable = new Runnable() {
|
| public void run() {
| Hashtable properties = new Hashtable();
| properties.put(Context.INITIAL_CONTEXT_FACTORY,
| "org.jnp.interfaces.NamingContextFactory");
| properties.put(Context.URL_PKG_PREFIXES,
| "org.jboss.naming:org.jnp.interfaces");
| properties.put(Context.PROVIDER_URL, "jnp://localhost:1099");
| properties.put(Context.SECURITY_PRINCIPAL, "admin");
| properties.put(Context.SECURITY_CREDENTIALS, "admin");
|
| ConnectionFactory connectionFactory = null;
|
| try {
| Context context = new InitialContext(properties);
| connectionFactory = (ConnectionFactory) context
| .lookup("ConnectionFactory");
| Connection connection = connectionFactory
| .createConnection();
| connection.start();
|
| MessageConsumer[] consumers = new
MessageConsumer[numberOfProcesses];
| Session[] sessions = new Session[numberOfProcesses];
| MessageListener[] listeners = new
MessageListener[numberOfProcesses];
| final long[] lastReceived = new long[numberOfProcesses];
|
| for (int j = 0; j < consumers.length; j++) {
| sessions[j] = connection.createSession(false,
| Session.AUTO_ACKNOWLEDGE);
| final Queue sourceQ = sessions[j]
| .createQueue(receiveQueue);
| final Queue destQ = sessions[j].createQueue(sendQueue);
| final Session session = sessions[j];
|
| consumers[j] = sessions[j].createConsumer(sourceQ);
| lastReceived[j] = 0;
| final int count = j;
|
| listeners[j] = new MessageListener() {
| public String threadName;
|
| private boolean threadNameChanged = false;
|
| public void onMessage(Message msg) {
| lastReceived[count] = System
| .currentTimeMillis();
| try {
| if (!threadNameChanged) {
| threadName = receiveQueue + "." + count;
| Thread.currentThread().setName(
| threadName);
| threadNameChanged = true;
| }
| String payload = ((TextMessage) msg)
| .getText();
| if (printMessages) {
| System.out.println(Thread
| .currentThread().getName()
| + " " + payload);
| }
| MessageProducer producer = session
| .createProducer(destQ);
| try {
| // Simulate normal processing time
| Thread.currentThread().sleep(delay);
| } catch (InterruptedException e) {
| e.printStackTrace();
| }
| producer.send(msg);
| producer.close();
| } catch (JMSException e) {
| e.printStackTrace();
| }
| }
| };
|
| consumers[j].setMessageListener(listeners[j]);
| }
| // Thread.currentThread().sleep(Long.MAX_VALUE);
| while (true) {
| Thread.currentThread().sleep(60000);
| System.out.println("Checking " + receiveQueue);
| long currentTime = System.currentTimeMillis();
| for (int j = 0; j < numberOfProcesses; j++) {
| if (lastReceived[j] > 0 && currentTime - lastReceived[j] >
60000) {
| System.out.println("********** " + receiveQueue + "."
| + j + " has stopped **********");
| }
| }
| }
| } catch (Exception e) {
| throw new RuntimeException(e);
| }
| }
| };
|
| Thread t = new Thread(runnable);
| t.setName("Manager " + receiveQueue);
| t.start();
| }
| }
|
View the original post :
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=3962366#3962366
Reply to the post :
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=3962366
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user