Hi,

In my application, occasionally, my consumers mis-behave and can consume a very large amount of memory. I have found that occasionally, the consumer's associated TcpTransport thread can be hit with an OutOfMemoryException as a result - for example, when processing an incoming message.

What I found was the TcpTransport thread just dumped the OutOfMemoryError to stderr and exited without notifying any of its transport listeners.

It seems to me TcpTransport should catch Throwable (like it does with IOException), and indicate to the listeners that the transport is going down. We often catch Throwable in other parts of the code, so it seems appropriate we do the same here.

Without this change, I found my consumer, which recovers from its OutOfMemoryError, gets stuck since its transport thread has died, for example, it calls session.commit() and never returns:

        at sun.misc.Unsafe.park(Native Method)
- waiting on [EMAIL PROTECTED]
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:158)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1925) at java.util.concurrent.ArrayBlockingQueue.take(ArrayBlockingQueue.java:317) at org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40) at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:76) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1175) at org.apache.activemq.TransactionContext.commit(TransactionContext.java:259)
        at org.apache.activemq.ActiveMQSession.commit(ActiveMQSession.java:494)


Here is my proposed patch. Incidentally, stoppedLatch.get().countDown(); only needs to be in the finally block. In the current code, if an IOException is thrown, this statement is executed twice.

Index: activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java
===================================================================
--- activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (revision 675380) +++ activemq-core/src/main/java/org/apache/activemq/transport/tcp/TcpTransport.java (working copy)
@@ -183,8 +183,13 @@
                 doRun();
             }
         } catch (IOException e) {
-            stoppedLatch.get().countDown();
             onException(e);
+        } catch (Throwable t) {
+ // Make sure drastic conditions, such as OutOfMemoryError are still
+            // reported to the transport's onException() listeners.
+ IOException e = new IOException("Unexpected exception occurred");
+            e.initCause(t);
+            onException(e);
         } finally {
             stoppedLatch.get().countDown();
         }

--
Cheers,
David

Nuix Pty Ltd
Suite 79, 89 Jones St, Ultimo NSW 2007, Australia    Ph: +61 2 9280 0699
Web: http://www.nuix.com                            Fax: +61 2 9212 6902

Reply via email to