After some re-testing of this issue I can confirm it is still present. I've developed a new test case that creates a message producer and consumer on two separate JBoss Messaging servers. By continually shutting down and restarting each JBoss Messaging server I can eventually cause the shutting down of one server to stop the message listeners on the *other* server.
This test case also demonstrates the same reconnection issue that I've raised in this post http://www.jboss.org/index.html?module=bb&op=viewtopic&t=102233 Our JBoss Messaging servers have different ServerPeerIDs and StoreIds as suggested elsewhere. To run the test case, start both JBoss Messaging servers then start the test case. A producer will be created on each server and will start dispatching messages to a queue. A message listener on the queue will acknowledge the dispatched message. Stop one of the servers for a while then restart it. The producer and consumer on that server should start up again. Quite often they do not. | import java.util.Hashtable; | | import javax.jms.Connection; | import javax.jms.ConnectionFactory; | import javax.jms.ExceptionListener; | import javax.jms.JMSException; | import javax.jms.Message; | import javax.jms.MessageConsumer; | import javax.jms.MessageListener; | import javax.jms.MessageProducer; | import javax.jms.Queue; | import javax.jms.Session; | import javax.naming.Context; | import javax.naming.InitialContext; | | import org.apache.commons.logging.Log; | import org.apache.commons.logging.LogFactory; | | public class MultipleServerReconnectTest { | | class DispatcherThread extends Thread { | private ConnectionFactory connectionFactory; | | private String id; | | private boolean initialised = false; | | private Queue queue; | | private boolean recycle = false; | | private boolean shutdown = false; | | public DispatcherThread(ConnectionFactory connectionFactory,// | Queue queue, String id) { | super(); | this.connectionFactory = connectionFactory; | this.queue = queue; | this.id = id; | this.setName(id); | } | | private boolean isRecycle() { | return recycle; | } | | public void run() { | Connection connection = null; | Session session = null; | MessageProducer producer = null; | ExceptionListener exceptionListener = null; | | while (!shutdown) { | if (!initialised) { | try { | connection = connectionFactory.createConnection(); | exceptionListener = new ExceptionListener() { | public void onException(JMSException ex) { | LOG.error("Received connection exception", ex); | recycle = true; | } | }; | connection.setExceptionListener(exceptionListener); | session = connection.createSession(false, | Session.AUTO_ACKNOWLEDGE); | producer = session.createProducer(queue); | LOG.info(id + " initialised"); | initialised = true; | } catch (JMSException ex) { | LOG.error("Caught exception during initialisation", ex); | recycle = true; | } | } | if (isRecycle()) { | if (producer != null) { | try { | producer.close(); | } catch (Exception ex) { | LOG.error("Caught exception during producer close", | ex); | } | } | if (session != null) { | try { | session.close(); | } catch (Exception ex) { | LOG.error("Caught exception during session close", | ex); | } | } | if (connection != null) { | try { | connection.close(); | } catch (Exception ex) { | LOG.error( | "Caught exception during connection close", | ex); | } | } | producer = null; | session = null; | connection = null; | initialised = false; | recycle = false; | } | if (initialised && (!recycle) && (!shutdown)) { | try { | Thread.sleep(1000); | Message message = session | .createTextMessage("This is a test"); | producer.send(message); | LOG.info(id + " dispatched message"); | } catch (Exception ex) { | LOG.error("Caught exception during send", ex); | recycle = true; | } | } | } | } | | public void shutdown() { | LOG.info(id + " is shutting down"); | recycle = true; | shutdown = true; | } | } | | class ListenerManagerThread extends Thread { | private ConnectionFactory connectionFactory; | | private String id; | | private boolean initialised = false; | | private MessageListener messageListener; | | private Queue queue; | | private boolean recycle = false; | | private boolean shutdown = false; | | public ListenerManagerThread(ConnectionFactory connectionFactory, | Queue queue, MessageListener messageListener, String id) { | super(); | this.connectionFactory = connectionFactory; | this.queue = queue; | this.messageListener = messageListener; | this.id = id; | this.setName(id); | } | | private boolean isRecycle() { | return recycle; | } | | public void run() { | Connection connection = null; | Session session = null; | MessageConsumer consumer = null; | ExceptionListener exceptionListener = null; | | while (!shutdown) { | if (!initialised) { | try { | connection = connectionFactory.createConnection(); | exceptionListener = new ExceptionListener() { | public void onException(JMSException ex) { | LOG.error("Received connection exception", ex); | recycle = true; | } | }; | connection.setExceptionListener(exceptionListener); | session = connection.createSession(false, | Session.AUTO_ACKNOWLEDGE); | consumer = session.createConsumer(queue); | consumer.setMessageListener(messageListener); | connection.start(); | LOG.info(id + " initialised"); | initialised = true; | } catch (JMSException ex) { | LOG.error("Caught exception during initialisation", ex); | recycle = true; | } | } | if (isRecycle()) { | if (consumer != null) { | try { | consumer.setMessageListener(null); | consumer.close(); | } catch (Exception ex) { | LOG.error("Caught exception during consumer close", | ex); | } | } | if (session != null) { | try { | session.close(); | } catch (Exception ex) { | LOG.error("Caught exception during session close", | ex); | } | } | if (connection != null) { | try { | connection.close(); | } catch (Exception ex) { | LOG.error( | "Caught exception during connection close", | ex); | } | } | consumer = null; | session = null; | connection = null; | initialised = false; | recycle = false; | } | try { | Thread.sleep(1000); | } catch (InterruptedException ex) { | LOG.error("Caught exception during sleep"); | } | } | } | | public void shutdown() { | LOG.info(id + " is shutting down"); | recycle = true; | shutdown = true; | } | } | | class SimpleListener implements MessageListener { | | private String id; | | public SimpleListener(String id) { | super(); | this.id = id; | } | | /** | * @see javax.jms.MessageListener#onMessage(javax.jms.Message) | */ | public void onMessage(Message message) { | LOG.info(id + " received message"); | } | | } | | private static final Log LOG = LogFactory | .getLog(MultipleServerReconnectTest.class); | | public static void main(String[] args) { | MultipleServerReconnectTest test = new MultipleServerReconnectTest(); | | try { | test.start(); | } catch (Throwable ex) { | LOG.error("Caught exception in main", ex); | } | } | | private void start() throws Exception { | /* | * If you want to run the following test case under ActiveMQ 3.2.1 then | * only the following properties are required: | * | * properties1.put(Context.INITIAL_CONTEXT_FACTORY, | * "org.activemq.jndi.ActiveMQInitialContextFactory"); | * properties1.put(Context.PROVIDER_URL, "tcp://localhost:61616"); | * properties2.put(Context.INITIAL_CONTEXT_FACTORY, | * "org.activemq.jndi.ActiveMQInitialContextFactory"); | * properties2.put(Context.PROVIDER_URL, "tcp://localhost:61617"); | * | * For ActiveMQ 4.1.0 the required context factory is | * org.apache.activemq.jndi.ActiveMQInitialContextFactory | */ | // Setup connection 1 | Hashtable properties1 = new Hashtable(); | properties1.put(Context.INITIAL_CONTEXT_FACTORY, | "org.jnp.interfaces.NamingContextFactory"); | properties1.put(Context.URL_PKG_PREFIXES, | "org.jboss.naming:org.jnp.interfaces"); | properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099"); | properties1.put(Context.SECURITY_PRINCIPAL, "admin"); | properties1.put(Context.SECURITY_CREDENTIALS, "admin"); | | // Setup connection 2 | Hashtable properties2 = new Hashtable(); | properties2.put(Context.INITIAL_CONTEXT_FACTORY, | "org.jnp.interfaces.NamingContextFactory"); | properties2.put(Context.URL_PKG_PREFIXES, | "org.jboss.naming:org.jnp.interfaces"); | // change the following url to point to your second jboss instance | properties2.put(Context.PROVIDER_URL, "jnp://otherhost:1099"); | properties2.put(Context.SECURITY_PRINCIPAL, "admin"); | properties2.put(Context.SECURITY_CREDENTIALS, "admin"); | | ConnectionFactory connectionFactory1 = null; | Queue queue1 = null; | Context context1 = null; | | context1 = new InitialContext(properties1); | connectionFactory1 = (ConnectionFactory) context1 | .lookup("ConnectionFactory"); | // Make sure this queue has been configured on your jboss server | // (under ActiveMQ use "dynamicQueues/testQueue") | queue1 = (Queue) context1.lookup("/queue/tc1_q1"); | | ConnectionFactory connectionFactory2 = null; | Queue queue2 = null; | Context context2 = null; | | context2 = new InitialContext(properties2); | connectionFactory2 = (ConnectionFactory) context2 | .lookup("ConnectionFactory"); | // Make sure this queue has been configured on your jboss server | // (under ActiveMQ use "dynamicQueues/testQueue") | queue2 = (Queue) context2.lookup("/queue/tc1_q1"); | | MessageListener listener1 = new SimpleListener("Listener.1"); | ListenerManagerThread manager1 = new ListenerManagerThread( | connectionFactory1, queue1, listener1, "ListenerManager.1"); | manager1.start(); | | DispatcherThread dispatcher1 = new DispatcherThread(connectionFactory1, | queue1, "Dispatcher.1"); | dispatcher1.start(); | | MessageListener listener2 = new SimpleListener("Listener.2"); | ListenerManagerThread manager2 = new ListenerManagerThread( | connectionFactory2, queue2, listener2, "ListenerManager.2"); | manager2.start(); | | DispatcherThread dispatcher2 = new DispatcherThread(connectionFactory2, | queue2, "Dispatcher.2"); | dispatcher2.start(); | | // 10 minutes | Thread.sleep(600000); | | manager1.shutdown(); | manager1.join(); | | dispatcher1.shutdown(); | dispatcher1.join(); | | manager2.shutdown(); | manager2.join(); | | dispatcher2.shutdown(); | dispatcher2.join(); | | context1.close(); | context2.close(); | } | } | View the original post : http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4020399#4020399 Reply to the post : http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4020399 _______________________________________________ jboss-user mailing list [email protected] https://lists.jboss.org/mailman/listinfo/jboss-user
