Following on from this thread: 
http://www.jboss.org/index.html?module=bb&op=viewtopic&t=102491 

I'm currently experiencing multiple failover issues with the 1.2.0.GA release. 
I'm running two clustered nodes on my local machine (JB4.0.4, Win XP, JVM1.4.2) 
using all the default settings, following the clustered node instructions in 
the user guide.

After starting both messaging-node0 and messaging-node1 I start my test case 
(attached).

The first problem I have with the test case that I created is that the message 
listener does not receive any of the dispatched messages (the test case creates 
a message dispatcher and message listener - the dispatcher sends a message to a 
queue that the listener is attached to). This happens regardless of the queue 
type (i.e. clustered/non-clustered - in this case testDistributedQueue or 
testQueue). 
The only way I can get the listener to start receiving messages is to kill one 
of the nodes e.g. kill node0.

Initially I thought my listener may have ended up on a different node to the 
dispatcher, so it could not see the messages that were being dispatched but I 
thought JBoss Messaging handles this scenario?

The second issue is that it's pretty easy to stop messages being dispatched and 
received altogether by randomly stopping and starting the individual nodes e.g. 
stop both nodes and bring one back up - my test case was unable to get a 
connection after both nodes had been shut down.

I'm interested to know if anyone is seeing similar behaviour.

Ben


  | import java.util.Hashtable;
  | 
  | import javax.jms.Connection;
  | import javax.jms.ConnectionFactory;
  | import javax.jms.ExceptionListener;
  | import javax.jms.JMSException;
  | import javax.jms.Message;
  | import javax.jms.MessageConsumer;
  | import javax.jms.MessageListener;
  | import javax.jms.MessageProducer;
  | import javax.jms.Queue;
  | import javax.jms.Session;
  | import javax.naming.Context;
  | import javax.naming.InitialContext;
  | 
  | import org.apache.commons.logging.Log;
  | import org.apache.commons.logging.LogFactory;
  | 
  | public class ReconnectTest {
  | 
  |     class DispatcherThread extends Thread {
  |             private ConnectionFactory connectionFactory;
  | 
  |             private String id;
  | 
  |             private boolean initialised = false;
  | 
  |             private Queue queue;
  | 
  |             private boolean recycle = false;
  | 
  |             private boolean shutdown = false;
  | 
  |             public DispatcherThread(ConnectionFactory connectionFactory,
  |                             Queue queue, String id) {
  |                     super();
  |                     this.connectionFactory = connectionFactory;
  |                     this.queue = queue;
  |                     this.id = id;
  |                     this.setName(id);
  |             }
  | 
  |             private boolean isRecycle() {
  |                     return recycle;
  |             }
  | 
  |             public void run() {
  |                     Connection connection = null;
  |                     Session session = null;
  |                     MessageProducer producer = null;
  |                     ExceptionListener exceptionListener = null;
  | 
  |                     while (!shutdown) {
  |                             if (!initialised) {
  |                                     try {
  |                                             connection = 
connectionFactory.createConnection();
  |                                             exceptionListener = new 
ExceptionListener() {
  |                                                     public void 
onException(JMSException ex) {
  |                                                             
LOG.error("Received connection exception", ex);
  |                                                             recycle = true;
  |                                                     }
  |                                             };
  |                                             
connection.setExceptionListener(exceptionListener);
  |                                             session = 
connection.createSession(false,
  |                                                             
Session.AUTO_ACKNOWLEDGE);
  |                                             producer = 
session.createProducer(queue);
  |                                             LOG.info(id + " initialised");
  |                                             initialised = true;
  |                                     } catch (JMSException ex) {
  |                                             LOG.error("Caught exception 
during initialisation", ex);
  |                                             recycle = true;
  |                                     }
  |                             }
  |                             if (isRecycle()) {
  |                                     JMSHelper.close(producer);
  |                                     JMSHelper.close(session);
  |                                     JMSHelper.close(connection);
  |                                     initialised = false;
  |                                     recycle = false;
  |                             }
  |                             if (initialised && (!recycle) && (!shutdown)) {
  |                                     try {
  |                                             Thread.sleep(1000);
  |                                             Message message = session
  |                                                             
.createTextMessage("This is a test");
  |                                             producer.send(message);
  |                                             LOG.info(id + " dispatched 
message");
  |                                     } catch (Exception ex) {
  |                                             LOG.error("Caught exception 
during send", ex);
  |                                             recycle = true;
  |                                     }
  |                             }
  |                     }
  |             }
  | 
  |             public void shutdown() {
  |                     LOG.info(id + " is shutting down");
  |                     recycle = true;
  |                     shutdown = true;
  |             }
  |     }
  | 
  |     static class JMSHelper {
  |             public static void close(Connection connection) {
  |                     if (connection != null) {
  |                             try {
  |                                     connection.close();
  |                             } catch (Exception ex) {
  |                                     LOG.error("Caught exception when 
closing connection", ex);
  |                             }
  |                             connection = null;
  |                     }
  |             }
  | 
  |             public static void close(MessageConsumer consumer) {
  |                     if (consumer != null) {
  |                             try {
  |                                     consumer.close();
  |                             } catch (Exception ex) {
  |                                     LOG.error("Caught exception when 
closing consumer", ex);
  |                             }
  |                     }
  |                     consumer = null;
  |             }
  | 
  |             public static void close(MessageProducer producer) {
  |                     if (producer != null) {
  |                             try {
  |                                     producer.close();
  |                             } catch (Exception ex) {
  |                                     LOG.error("Caught exception when 
closing producer", ex);
  |                             }
  |                     }
  |                     producer = null;
  |             }
  | 
  |             public static void close(Session session) {
  |                     if (session != null) {
  |                             try {
  |                                     session.close();
  |                             } catch (Exception ex) {
  |                                     LOG.error("Caught exception when 
closing session", ex);
  |                             }
  |                     }
  |                     session = null;
  |             }
  |     }
  | 
  |     class ListenerManagerThread extends Thread {
  |             private ConnectionFactory connectionFactory;
  | 
  |             private String id;
  | 
  |             private boolean initialised = false;
  | 
  |             private MessageListener messageListener;
  | 
  |             private Queue queue;
  | 
  |             private boolean recycle = false;
  | 
  |             private boolean shutdown = false;
  | 
  |             public ListenerManagerThread(ConnectionFactory 
connectionFactory,
  |                             Queue queue, MessageListener messageListener, 
String id) {
  |                     super();
  |                     this.connectionFactory = connectionFactory;
  |                     this.queue = queue;
  |                     this.messageListener = messageListener;
  |                     this.id = id;
  |                     this.setName(id);
  |             }
  | 
  |             private boolean isRecycle() {
  |                     return recycle;
  |             }
  | 
  |             public void run() {
  |                     Connection connection = null;
  |                     Session session = null;
  |                     MessageConsumer consumer = null;
  |                     ExceptionListener exceptionListener = null;
  | 
  |                     while (!shutdown) {
  |                             if (!initialised) {
  |                                     try {
  |                                             connection = 
connectionFactory.createConnection();
  |                                             exceptionListener = new 
ExceptionListener() {
  |                                                     public void 
onException(JMSException ex) {
  |                                                             
LOG.error("Received connection exception", ex);
  |                                                             recycle = true;
  |                                                     }
  |                                             };
  |                                             
connection.setExceptionListener(exceptionListener);
  |                                             session = 
connection.createSession(false,
  |                                                             
Session.AUTO_ACKNOWLEDGE);
  |                                             consumer = 
session.createConsumer(queue);
  |                                             
consumer.setMessageListener(messageListener);
  |                                             connection.start();
  |                                             LOG.info(id + " initialised");
  |                                             initialised = true;
  |                                     } catch (JMSException ex) {
  |                                             LOG.error("Caught exception 
during initialisation", ex);
  |                                             recycle = true;
  |                                     }
  |                             }
  |                             if (isRecycle()) {
  |                                     JMSHelper.close(consumer);
  |                                     JMSHelper.close(session);
  |                                     JMSHelper.close(connection);
  |                                     initialised = false;
  |                                     recycle = false;
  |                             }
  |                             try {
  |                                     Thread.sleep(1000);
  |                             } catch (InterruptedException ex) {
  |                                     LOG.error("Caught exception during 
sleep");
  |                             }
  |                     }
  |             }
  | 
  |             public void shutdown() {
  |                     LOG.info(id + " is shutting down");
  |                     recycle = true;
  |                     shutdown = true;
  |             }
  |     }
  | 
  |     class SimpleListener implements MessageListener {
  | 
  |             private String id;
  | 
  |             public SimpleListener(String id) {
  |                     super();
  |                     this.id = id;
  |             }
  | 
  |             public void onMessage(Message message) {
  |                     LOG.info(id + " received message");
  |             }
  | 
  |     }
  | 
  |     private static final Log LOG = LogFactory.getLog(ReconnectTest.class);
  | 
  |     public static void main(String[] args) {
  |             ReconnectTest test = new ReconnectTest();
  | 
  |             try {
  |                     test.start();
  |             } catch (Throwable ex) {
  |                     LOG.error("Caught exception in main", ex);
  |             }
  |     }
  | 
  |     private void start() throws Exception {
  |             // Setup connection 1
  |             Hashtable properties1 = new Hashtable();
  |             properties1.put(Context.INITIAL_CONTEXT_FACTORY,
  |                             "org.jnp.interfaces.NamingContextFactory");
  |             properties1.put(Context.URL_PKG_PREFIXES,
  |                             "org.jboss.naming:org.jnp.interfaces");
  |             properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
  |             properties1.put(Context.SECURITY_PRINCIPAL, "admin");
  |             properties1.put(Context.SECURITY_CREDENTIALS, "admin");
  | 
  |             ConnectionFactory connectionFactory1 = null;
  |             Queue queue1 = null;
  |             Context context1 = null;
  | 
  |             context1 = new InitialContext(properties1);
  |             connectionFactory1 = (ConnectionFactory) context1
  |                             .lookup("ConnectionFactory");
  |             queue1 = (Queue) context1.lookup("/queue/testDistributedQueue");
  | 
  |             MessageListener listener1 = new SimpleListener("Listener.1");
  |             ListenerManagerThread manager1 = new ListenerManagerThread(
  |                             connectionFactory1, queue1, listener1, 
"ListenerManager.1");
  |             manager1.start();
  | 
  |             DispatcherThread dispatcher1 = new 
DispatcherThread(connectionFactory1,
  |                             queue1, "Dispatcher.1");
  |             dispatcher1.start();
  | 
  |             Thread.sleep(Long.MAX_VALUE);
  | 
  |             manager1.shutdown();
  |             manager1.join();
  | 
  |             dispatcher1.shutdown();
  |             dispatcher1.join();
  | 
  |             context1.close();
  |     }
  | }
  | 

View the original post : 
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4024981#4024981

Reply to the post : 
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4024981
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user

Reply via email to