Hello, were using JBoss 4.0.4GA with Messaging 1.0.1.SP4.
I've been testing a basic failure scenario where our application is forced to
reconnect to JBoss Messaging after experiencing a JBoss connectivity problem.
In theory, when a JBoss connectivity problem is detected (via an
ExceptionListener that we set on the jms Connection) we should be able to shut
down all our message producers and consumers and recreate them using a new
Connection - or at least keep trying to do so until a connection is obtained.
What I'm finding is that about half of the time our producers and consumers can
successfully reconnect. Frequently however, they appear to become 'stuck' in
the jboss client when attempting to tidy-up after the problem with the
Connection is detected. By tidy-up I mean we attempt to close our
producers/consumers, sessions and finally the connections themselves - before
starting over and recreating everything (i.e. the connection, session etc).
I've written a test case to demonstrate this. What the test case does is spawn
two threads. One thread is used for dispatching messages. This thread creates
its own connection, session and message producer. The other thread is used for
message listening. This thread also creates its own connection, session and
message consumer (with message listener). The dispatcher sends messages to the
same queue the listener has been configured on.
Before running the test case, configure the queue name to be used.
Note that the test case has a dependency on log4j and commons logging so you'll
need those jars handy.
Start JBoss.
When you run the test case, the dispatcher will start sending messages which
will be received by the listener.
You should see the following output being repeated over and over:
| 17:39:36,187 INFO @Dispatcher.1 [MultipleServerTest] Dispatcher.1
dispatched message
| 17:39:36,187 INFO @Thread-12 [MultipleServerTest] Listener.1 received
message
| 17:39:37,203 INFO @Dispatcher.1 [MultipleServerTest] Dispatcher.1
dispatched message
| 17:39:37,203 INFO @Thread-12 [MultipleServerTest] Listener.1 received
message
|
Stop and restart your JBoss server - what *should* happen each time is that
dispatcher and listener recover and start sending/receiving messages again.
What I'm frequently seeing is that either the dispatcher or listener or
sometimes both do not recover. You may need to stop and start the server
several times before the behaviour is observed.
The listener/dispatcher threads seem to be getting stuck here:
| Thread [ListenerManager.1] (Suspended)
| SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int)
line: not available [native method]
| SocketInputStream.read(byte[], int, int) line: 129
| BufferedInputStream.fill() line: 183
| BufferedInputStream.read() line: 201
| DataInputStream(FilterInputStream).read() line: 66
| JBossObjectInputStream.read() line: 193
| SocketClientInvoker(MicroSocketClientInvoker).readVersion(InputStream)
line: 902
| SocketClientInvoker(MicroSocketClientInvoker).transport(String, Object,
Map, Marshaller, UnMarshaller) line: 552
| SocketClientInvoker(MicroRemoteClientInvoker).invoke(InvocationRequest)
line: 122
| Client.invoke(Object, Map, InvokerLocator) line: 1414
| Client.invoke(Object, Map) line: 511
| ClientConsumerDelegate(DelegateSupport).invoke(Invocation) line: 111
| ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ConsumerAspect.handleClosing(Invocation) line: 108
| ConsumerAspect25.invoke(Invocation) line: not available
| ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ClosedInterceptor.invoke(Invocation) line: 182
| PerInstanceInterceptor.invoke(Invocation) line: 117
| ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ExceptionInterceptor.invoke(Invocation) line: 69
| ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ClientLogInterceptor.invoke(Invocation) line: 107
| ClientConsumerDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ClientConsumerDelegate.closing() line: not available
| JBossMessageConsumer.close() line: 96
| MultipleServerTest$ListenerManagerThread.run() line: 190
|
and here:
| Thread [Dispatcher.1] (Suspended)
| SocketInputStream.socketRead0(FileDescriptor, byte[], int, int, int)
line: not available [native method]
| SocketInputStream.read(byte[], int, int) line: 129
| BufferedInputStream.fill() line: 183
| BufferedInputStream.read() line: 201
| DataInputStream(FilterInputStream).read() line: 66
| JBossObjectInputStream.read() line: 193
| SocketClientInvoker(MicroSocketClientInvoker).readVersion(InputStream)
line: 902
| SocketClientInvoker(MicroSocketClientInvoker).transport(String, Object,
Map, Marshaller, UnMarshaller) line: 552
| SocketClientInvoker(MicroRemoteClientInvoker).invoke(InvocationRequest)
line: 122
| Client.invoke(Object, Map, InvokerLocator) line: 1414
| Client.invoke(Object, Map) line: 511
| ClientConnectionDelegate(DelegateSupport).invoke(Invocation) line: 111
| ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ClosedInterceptor.invoke(Invocation) line: 182
| PerInstanceInterceptor.invoke(Invocation) line: 117
| ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ExceptionInterceptor.invoke(Invocation) line: 69
| ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ClientLogInterceptor.invoke(Invocation) line: 107
| ClientConnectionDelegate$closing_4945873952494833124.invokeNext() line:
not available
| ClientConnectionDelegate.closing() line: not available
| JBossConnection.close() line: 131
| MultipleServerTest$DispatcherThread.run() line: 99
|
I also see errors in the JBoss log like:
| jvm 1 | 14:58:07,906 ERROR [ServerThread]
SocketServerInvoker[0.0.0.0:4489].invoke() call failed: Object with oid:
-2147483646 was not found in the Dispatcher
| jvm 1 | 14:58:07,906 ERROR [ServerThread]
SocketServerInvoker[0.0.0.0:4489].invoke() call failed: Object with oid:
-2147483642 was not found in the Dispatcher
| jvm 1 | 14:58:07,921 ERROR [STDERR] java.lang.ClassCircularityError:
org/jboss/messaging/core/plugin/IdBlock
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.jms.server.remoting.JMSWireFormat.write(JMSWireFormat.java:342)
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.remoting.transport.socket.ServerThread.versionedWrite(ServerThread.java:778)
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:585)
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:363)
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:159)
| jvm 1 | 14:58:07,921 ERROR [STDERR] java.lang.ClassCircularityError:
org/jboss/messaging/core/plugin/IdBlock
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.jms.server.remoting.JMSWireFormat.write(JMSWireFormat.java:342)
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.remoting.transport.socket.ServerThread.versionedWrite(ServerThread.java:778)
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.remoting.transport.socket.ServerThread.processInvocation(ServerThread.java:585)
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.remoting.transport.socket.ServerThread.dorun(ServerThread.java:363)
| jvm 1 | 14:58:07,921 ERROR [STDERR] at
org.jboss.remoting.transport.socket.ServerThread.run(ServerThread.java:159)
|
Anyway, here is my test case:
| import java.util.Hashtable;
|
| import javax.jms.Connection;
| import javax.jms.ConnectionFactory;
| import javax.jms.ExceptionListener;
| import javax.jms.JMSException;
| import javax.jms.Message;
| import javax.jms.MessageConsumer;
| import javax.jms.MessageListener;
| import javax.jms.MessageProducer;
| import javax.jms.Queue;
| import javax.jms.Session;
| import javax.naming.Context;
| import javax.naming.InitialContext;
|
| import org.apache.commons.logging.Log;
| import org.apache.commons.logging.LogFactory;
|
| public class RecoverTest {
|
| class DispatcherThread extends Thread {
| private ConnectionFactory connectionFactory;
|
| private String id;
|
| private boolean initialised = false;
|
| private Queue queue;
|
| private boolean recycle = false;
|
| private boolean shutdown = false;
|
| public DispatcherThread(ConnectionFactory connectionFactory,//
| Queue queue, String id) {
| super();
| this.connectionFactory = connectionFactory;
| this.queue = queue;
| this.id = id;
| this.setName(id);
| }
|
| private boolean isRecycle() {
| return recycle;
| }
|
| public void run() {
| Connection connection = null;
| Session session = null;
| MessageProducer producer = null;
| ExceptionListener exceptionListener = null;
|
| while (!shutdown) {
| if (!initialised) {
| try {
| connection =
connectionFactory.createConnection();
| exceptionListener = new
ExceptionListener() {
| public void
onException(JMSException ex) {
|
LOG.error("Received connection exception", ex);
| recycle = true;
| }
| };
|
connection.setExceptionListener(exceptionListener);
| session =
connection.createSession(false,
|
Session.AUTO_ACKNOWLEDGE);
| producer =
session.createProducer(queue);
| LOG.info(id + " initialised");
| initialised = true;
| } catch (JMSException ex) {
| LOG.error("Caught exception
during initialisation", ex);
| recycle = true;
| }
| }
| if (isRecycle()) {
| if (producer != null) {
| try {
| producer.close();
| } catch (Exception ex) {
| LOG.error("Caught
exception during producer close",
| ex);
| }
| }
| if (session != null) {
| try {
| session.close();
| } catch (Exception ex) {
| LOG.error("Caught
exception during session close",
| ex);
| }
| }
| if (connection != null) {
| try {
| connection.close();
| } catch (Exception ex) {
| LOG.error(
| "Caught
exception during connection close",
| ex);
| }
| }
| producer = null;
| session = null;
| connection = null;
| initialised = false;
| recycle = false;
| }
| if (initialised && (!recycle) && (!shutdown)) {
| try {
| Thread.sleep(1000);
| Message message = session
|
.createTextMessage("This is a test");
| producer.send(message);
| LOG.info(id + " dispatched
message");
| } catch (Exception ex) {
| LOG.error("Caught exception
during send", ex);
| recycle = true;
| }
| }
| }
| }
|
| public void shutdown() {
| LOG.info(id + " is shutting down");
| recycle = true;
| shutdown = true;
| }
| }
|
| class ListenerManagerThread extends Thread implements ExceptionListener
{
| private ConnectionFactory connectionFactory;
|
| private String id;
|
| private boolean initialised = false;
|
| private MessageListener messageListener;
|
| private Queue queue;
|
| private boolean recycle = false;
|
| private boolean shutdown = false;
|
| public ListenerManagerThread(ConnectionFactory
connectionFactory,
| Queue queue, MessageListener messageListener,
String id) {
| super();
| this.connectionFactory = connectionFactory;
| this.queue = queue;
| this.messageListener = messageListener;
| this.id = id;
| this.setName(id);
| }
|
| private boolean isRecycle() {
| return recycle;
| }
|
| public void onException(JMSException ex) {
| LOG.error("Received connection exception", ex);
| recycle = true;
| }
|
| public void run() {
| Connection connection = null;
| Session session = null;
| MessageConsumer consumer = null;
|
| while (!shutdown) {
| if (!initialised) {
| try {
| connection =
connectionFactory.createConnection();
|
connection.setExceptionListener(this);
| session =
connection.createSession(false,
|
Session.AUTO_ACKNOWLEDGE);
| consumer =
session.createConsumer(queue);
|
consumer.setMessageListener(messageListener);
| connection.start();
| LOG.info(id + " initialised");
| initialised = true;
| } catch (JMSException ex) {
| LOG.error("Caught exception
during initialisation", ex);
| recycle = true;
| }
| }
| if (isRecycle()) {
| if (consumer != null) {
| try {
|
consumer.setMessageListener(null);
| consumer.close();
| } catch (Exception ex) {
| LOG.error("Caught
exception during consumer close",
| ex);
| }
| }
| if (session != null) {
| try {
| session.close();
| } catch (Exception ex) {
| LOG.error("Caught
exception during session close",
| ex);
| }
| }
| if (connection != null) {
| try {
| connection.close();
| } catch (Exception ex) {
| LOG.error(
| "Caught
exception during connection close",
| ex);
| }
| }
| consumer = null;
| session = null;
| connection = null;
| initialised = false;
| recycle = false;
| }
| try {
| Thread.sleep(1000);
| } catch (InterruptedException ex) {
| LOG.error("Caught exception during
sleep");
| }
| }
| }
|
| public void shutdown() {
| LOG.info(id + " is shutting down");
| recycle = true;
| shutdown = true;
| }
| }
|
| class SimpleListener implements MessageListener {
|
| private String id;
|
| public SimpleListener(String id) {
| super();
| this.id = id;
| }
|
| /**
| * @see javax.jms.MessageListener#onMessage(javax.jms.Message)
| */
| public void onMessage(Message message) {
| LOG.info(id + " received message");
| }
|
| }
|
| private static final Log LOG = LogFactory.getLog(RecoverTest.class);
|
| public static void main(String[] args) {
| RecoverTest test = new RecoverTest();
|
| try {
| test.start();
| } catch (Throwable ex) {
| LOG.error("Caught exception in main", ex);
| }
| }
|
| private void start() throws Exception {
| // Setup connection 1
| Hashtable properties1 = new Hashtable();
| properties1.put(Context.INITIAL_CONTEXT_FACTORY,
| "org.jnp.interfaces.NamingContextFactory");
| properties1.put(Context.URL_PKG_PREFIXES,
| "org.jboss.naming:org.jnp.interfaces");
| properties1.put(Context.PROVIDER_URL, "jnp://localhost:1099");
| properties1.put(Context.SECURITY_PRINCIPAL, "admin");
| properties1.put(Context.SECURITY_CREDENTIALS, "admin");
|
| ConnectionFactory connectionFactory1 = null;
| Queue queue1 = null;
| Context context1 = null;
|
| context1 = new InitialContext(properties1);
| connectionFactory1 = (ConnectionFactory) context1
| .lookup("ConnectionFactory");
| queue1 = (Queue) context1.lookup("/queue/testQueue");
|
| MessageListener listener1 = new SimpleListener("Listener.1");
| ListenerManagerThread manager1 = new ListenerManagerThread(
| connectionFactory1, queue1, listener1,
"ListenerManager.1");
| manager1.start();
|
| DispatcherThread dispatcher1 = new
DispatcherThread(connectionFactory1,
| queue1, "Dispatcher.1");
| dispatcher1.start();
|
| Thread.sleep(600000);
|
| manager1.shutdown();
| manager1.join();
| dispatcher1.shutdown();
| dispatcher1.join();
| context1.close();
| }
| }
|
View the original post :
http://www.jboss.com/index.html?module=bb&op=viewtopic&p=4019739#4019739
Reply to the post :
http://www.jboss.com/index.html?module=bb&op=posting&mode=reply&p=4019739
_______________________________________________
jboss-user mailing list
[email protected]
https://lists.jboss.org/mailman/listinfo/jboss-user