Hi,
In my application, occasionally, my consumers mis-behave and can consume
a very large amount of memory. I have found that occasionally, the
consumer's associated TcpTransport thread can be hit with an
OutOfMemoryException as a result - for example, when processing an
incoming message.
What I found was the TcpTransport thread just dumped the
OutOfMemoryError to stderr and exited without notifying any of its
transport listeners.
It seems to me TcpTransport should catch Throwable (like it does with
IOException), and indicate to the listeners that the transport is going
down. We often catch Throwable in other parts of the code, so it seems
appropriate we do the same here.
Without this change, I found my consumer, which recovers from its
OutOfMemoryError, gets stuck since its transport thread has died, for
example, it calls session.commit() and never returns:
at sun.misc.Unsafe.park(Native Method)
- waiting on
[EMAIL PROTECTED]
at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at
java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925)
at
java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317)
at
org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:76)
at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1175)
at
org.apache.activemq.TransactionContext.commit(TransactionContext.java:259)
at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:494)
Here is my proposed patch. Incidentally,
stoppedLatch.get().countDown(); only needs to be in the finally block.
In the current code, if an IOException is thrown, this statement is
executed twice.
Index:
activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
===================================================================
---
activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(revision 675380)
+++
activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
(working copy)
@@ -183,8 +183,13 @@
doRun();
}
} catch (IOException e) {
- stoppedLatch.get().countDown();
onException(e);
+ } catch (Throwable t) {
+ // Make sure drastic conditions, such as OutOfMemoryError
are still
+ // reported to the transport's onException() listeners.
+ IOException e = new IOException("Unexpected exception
occurred");
+ e.initCause(t);
+ onException(e);
} finally {
stoppedLatch.get().countDown();
}
--
Cheers,
David
Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia Ph: +61 2 9280 0699
Web: http://www.nuix.com Fax: +61 2 9212 6902