After some re-testing of this issue I can confirm it is still present.

I've developed a new test case that creates a message producer and consumer on 
two separate JBoss Messaging servers. By continually shutting down and 
restarting each JBoss Messaging server I can eventually cause the shutting down 
of one server to stop the message listeners on the *other* server.

This test case also demonstrates the same reconnection issue that I've raised 
in this post http://www.jboss.org/index.html?module=bb&op=viewtopic&t=102233

Our JBoss Messaging servers have different ServerPeerIDs and StoreIds as 
suggested elsewhere.

To run the test case, start both JBoss Messaging servers then start the test 
case. A producer will be created on each server and will start dispatching 
messages to a queue. A message listener on the queue will acknowledge the 
dispatched message. Stop one of the servers for a while then restart it. The 
producer and consumer on that server should start up again. Quite often they do 
not.


  | import java.util.Hashtable;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.ExceptionListener;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageListener;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | 
  | import org.apache.commons.logging.Log;
  | import org.apache.commons.logging.LogFactory;
  | 
  | public class MultipleServerReconnectTest {
  | 
  |     class DispatcherThread extends Thread {
  |             private ConnectionFactory connectionFactory;
  | 
  |             private String id;
  | 
  |             private boolean initialised = false;
  | 
  |             private Queue queue;
  | 
  |             private boolean recycle = false;
  | 
  |             private boolean shutdown = false;
  | 
  |             public DispatcherThread(ConnectionFactory connectionFactory,//
  |                             Queue queue, String id) {
  |                     super();
  |                     this.connectionFactory = connectionFactory;
  |                     this.queue = queue;
  |                     this.id = id;
  |                     this.setName(id);
  |             }
  | 
  |             private boolean isRecycle() {
  |                     return recycle;
  |             }
  | 
  |             public void run() {
  |                     Connection connection = null;
  |                     Session session = null;
  |                     MessageProducer producer = null;
  |                     ExceptionListener exceptionListener = null;
  | 
  |                     while (!shutdown) {
  |                             if (!initialised) {
  |                                     try {
  |                                             connection = 
connectionFactory.createConnection();
  |                                             exceptionListener = new 
ExceptionListener() {
  |                                                     public void 
onException(JMSException ex) {
  |                                                             
LOG.error("Received connection exception", ex);
  |                                                             recycle = true;
  |                                                     }
  |                                             };
  |                                             
connection.setExceptionListener(exceptionListener);
  |                                             session = 
connection.createSession(false,
  |                                                             
Session.AUTO_ACKNOWLEDGE);
  |                                             producer = 
session.createProducer(queue);
  |                                             LOG.info(id + " initialised");
  |                                             initialised = true;
  |                                     } catch (JMSException ex) {
  |                                             LOG.error("Caught exception 
during initialisation", ex);
  |                                             recycle = true;
  |                                     }
  |                             }
  |                             if (isRecycle()) {
  |                                     if (producer != null) {
  |                                             try {
  |                                                     producer.close();
  |                                             } catch (Exception ex) {
  |                                                     LOG.error("Caught 
exception during producer close",
  |                                                                     ex);
  |                                             }
  |                                     }
  |                                     if (session != null) {
  |                                             try {
  |                                                     session.close();
  |                                             } catch (Exception ex) {
  |                                                     LOG.error("Caught 
exception during session close",
  |                                                                     ex);
  |                                             }
  |                                     }
  |                                     if (connection != null) {
  |                                             try {
  |                                                     connection.close();
  |                                             } catch (Exception ex) {
  |                                                     LOG.error(
  |                                                                     "Caught 
exception during connection close",
  |                                                                     ex);
  |                                             }
  |                                     }
  |                                     producer = null;
  |                                     session = null;
  |                                     connection = null;
  |                                     initialised = false;
  |                                     recycle = false;
  |                             }
  |                             if (initialised && (!recycle) && (!shutdown)) {
  |                                     try {
  |                                             Thread.sleep(1000);
  |                                             Message message = session
  |                                                             
.createTextMessage("This is a test");
  |                                             producer.send(message);
  |                                             LOG.info(id + " dispatched 
message");
  |                                     } catch (Exception ex) {
  |                                             LOG.error("Caught exception 
during send", ex);
  |                                             recycle = true;
  |                                     }
  |                             }
  |                     }
  |             }
  | 
  |             public void shutdown() {
  |                     LOG.info(id + " is shutting down");
  |                     recycle = true;
  |                     shutdown = true;
  |             }
  |     }
  | 
  |     class ListenerManagerThread extends Thread {
  |             private ConnectionFactory connectionFactory;
  | 
  |             private String id;
  | 
  |             private boolean initialised = false;
  | 
  |             private MessageListener messageListener;
  | 
  |             private Queue queue;
  | 
  |             private boolean recycle = false;
  | 
  |             private boolean shutdown = false;
  | 
  |             public ListenerManagerThread(ConnectionFactory 
connectionFactory,
  |                             Queue queue, MessageListener messageListener, 
String id) {
  |                     super();
  |                     this.connectionFactory = connectionFactory;
  |                     this.queue = queue;
  |                     this.messageListener = messageListener;
  |                     this.id = id;
  |                     this.setName(id);
  |             }
  | 
  |             private boolean isRecycle() {
  |                     return recycle;
  |             }
  | 
  |             public void run() {
  |                     Connection connection = null;
  |                     Session session = null;
  |                     MessageConsumer consumer = null;
  |                     ExceptionListener exceptionListener = null;
  | 
  |                     while (!shutdown) {
  |                             if (!initialised) {
  |                                     try {
  |                                             connection = 
connectionFactory.createConnection();
  |                                             exceptionListener = new 
ExceptionListener() {
  |                                                     public void 
onException(JMSException ex) {
  |                                                             
LOG.error("Received connection exception", ex);
  |                                                             recycle = true;
  |                                                     }
  |                                             };
  |                                             
connection.setExceptionListener(exceptionListener);
  |                                             session = 
connection.createSession(false,
  |                                                             
Session.AUTO_ACKNOWLEDGE);
  |                                             consumer = 
session.createConsumer(queue);
  |                                             
consumer.setMessageListener(messageListener);
  |                                             connection.start();
  |                                             LOG.info(id + " initialised");
  |                                             initialised = true;
  |                                     } catch (JMSException ex) {
  |                                             LOG.error("Caught exception 
during initialisation", ex);
  |                                             recycle = true;
  |                                     }
  |                             }
  |                             if (isRecycle()) {
  |                                     if (consumer != null) {
  |                                             try {
  |                                                     
consumer.setMessageListener(null);
  |                                                     consumer.close();
  |                                             } catch (Exception ex) {
  |                                                     LOG.error("Caught 
exception during consumer close",
  |                                                                     ex);
  |                                             }
  |                                     }
  |                                     if (session != null) {
  |                                             try {
  |                                                     session.close();
  |                                             } catch (Exception ex) {
  |                                                     LOG.error("Caught 
exception during session close",
  |                                                                     ex);
  |                                             }
  |                                     }
  |                                     if (connection != null) {
  |                                             try {
  |                                                     connection.close();
  |                                             } catch (Exception ex) {
  |                                                     LOG.error(
  |                                                                     "Caught 
exception during connection close",
  |                                                                     ex);
  |                                             }
  |                                     }
  |                                     consumer = null;
  |                                     session = null;
  |                                     connection = null;
  |                                     initialised = false;
  |                                     recycle = false;
  |                             }
  |                             try {
  |                                     Thread.sleep(1000);
  |                             } catch (InterruptedException ex) {
  |                                     LOG.error("Caught exception during 
sleep");
  |                             }
  |                     }
  |             }
  | 
  |             public void shutdown() {
  |                     LOG.info(id + " is shutting down");
  |                     recycle = true;
  |                     shutdown = true;
  |             }
  |     }
  | 
  |     class SimpleListener implements MessageListener {
  | 
  |             private String id;
  | 
  |             public SimpleListener(String id) {
  |                     super();
  |                     this.id = id;
  |             }
  | 
  |             /**
  |              * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
  |              */
  |             public void onMessage(Message message) {
  |                     LOG.info(id + " received message");
  |             }
  | 
  |     }
  | 
  |     private static final Log LOG = LogFactory
  |                     .getLog(MultipleServerReconnectTest.class);
  | 
  |     public static void main(String[] args) {
  |             MultipleServerReconnectTest test = new 
MultipleServerReconnectTest();
  | 
  |             try {
  |                     test.start();
  |             } catch (Throwable ex) {
  |                     LOG.error("Caught exception in main", ex);
  |             }
  |     }
  | 
  |     private void start() throws Exception {
  |             /*
  |              * If you want to run the following test case under ActiveMQ 
3.2.1 then
  |              * only the following properties are required:
  |              * 
  |              * properties1.put(Context.INITIAL_CONTEXT_FACTORY,
  |              * "org.activemq.jndi.ActiveMQInitialContextFactory");
  |              * properties1.put(Context.PROVIDER_URL, 
"tcp://localhost:61616");
  |              * properties2.put(Context.INITIAL_CONTEXT_FACTORY,
  |              * "org.activemq.jndi.ActiveMQInitialContextFactory");
  |              * properties2.put(Context.PROVIDER_URL, 
"tcp://localhost:61617");
  |              * 
  |              * For ActiveMQ 4.1.0 the required context factory is
  |              * org.apache.activemq.jndi.ActiveMQInitialContextFactory
  |              */
  |             // Setup connection 1
  |             Hashtable properties1 = new Hashtable();
  |             properties1.put(Context.INITIAL_CONTEXT_FACTORY,
  |                             "org.jnp.interfaces.NamingContextFactory");
  |             properties1.put(Context.URL_PKG_PREFIXES,
  |                             "org.jboss.naming:org.jnp.interfaces");
  |             properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  |             properties1.put(Context.SECURITY_PRINCIPAL, "admin");
  |             properties1.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  |             // Setup connection 2
  |             Hashtable properties2 = new Hashtable();
  |             properties2.put(Context.INITIAL_CONTEXT_FACTORY,
  |                             "org.jnp.interfaces.NamingContextFactory");
  |             properties2.put(Context.URL_PKG_PREFIXES,
  |                             "org.jboss.naming:org.jnp.interfaces");
  |             // change the following url to point to your second jboss 
instance
  |             properties2.put(Context.PROVIDER_URL, "jnp://otherhost:1099");
  |             properties2.put(Context.SECURITY_PRINCIPAL, "admin");
  |             properties2.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  |             ConnectionFactory connectionFactory1 = null;
  |             Queue queue1 = null;
  |             Context context1 = null;
  | 
  |             context1 = new InitialContext(properties1);
  |             connectionFactory1 = (ConnectionFactory) context1
  |                             .lookup("ConnectionFactory");
  |             // Make sure this queue has been configured on your jboss server
  |             // (under ActiveMQ use "dynamicQueues/testQueue")
  |             queue1 = (Queue) context1.lookup("/queue/tc1_q1");
  | 
  |             ConnectionFactory connectionFactory2 = null;
  |             Queue queue2 = null;
  |             Context context2 = null;
  | 
  |             context2 = new InitialContext(properties2);
  |             connectionFactory2 = (ConnectionFactory) context2
  |                             .lookup("ConnectionFactory");
  |             // Make sure this queue has been configured on your jboss server
  |             // (under ActiveMQ use "dynamicQueues/testQueue")
  |             queue2 = (Queue) context2.lookup("/queue/tc1_q1");
  | 
  |             MessageListener listener1 = new SimpleListener("Listener.1");
  |             ListenerManagerThread manager1 = new ListenerManagerThread(
  |                             connectionFactory1, queue1, listener1, 
"ListenerManager.1");
  |             manager1.start();
  | 
  |             DispatcherThread dispatcher1 = new 
DispatcherThread(connectionFactory1,
  |                             queue1, "Dispatcher.1");
  |             dispatcher1.start();
  | 
  |             MessageListener listener2 = new SimpleListener("Listener.2");
  |             ListenerManagerThread manager2 = new ListenerManagerThread(
  |                             connectionFactory2, queue2, listener2, 
"ListenerManager.2");
  |             manager2.start();
  | 
  |             DispatcherThread dispatcher2 = new 
DispatcherThread(connectionFactory2,
  |                             queue2, "Dispatcher.2");
  |             dispatcher2.start();
  | 
  |             // 10 minutes
  |             Thread.sleep(600000);
  | 
  |             manager1.shutdown();
  |             manager1.join();
  | 
  |             dispatcher1.shutdown();
  |             dispatcher1.join();
  | 
  |             manager2.shutdown();
  |             manager2.join();
  | 
  |             dispatcher2.shutdown();
  |             dispatcher2.join();
  | 
  |             context1.close();
  |             context2.close();
  |     }
  | }
  | 

View the original post : 
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4020399#4020399

Reply to the post : 
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4020399
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user

Reply via email to