Hello.
Some time ago, we started suffering deadlock problems in our system,
using activemq (4.1) to handle messaging needs.
I thought , in the first instance, that the problem was caused by the
consumers threads, since they where writing to the transport socket (to
send acks or committing consumed messages) and so, I considered that
enabling asyncDispatch could be a solution for this problem.
After a complete failure of this "solution" (the deadlock keeps
happening) I reconsidered again the scenario, a new theory arised
reading that the problem is mostly related with the activemq transport
thread. This is actually the one reading from the socket, but also, in
some ocasions, writing to it, as we can see in the
org.apache.activemq.broker.TransportConnection code:
this.transport.setTransportListener(new DefaultTransportListener() {
public void onCommand(Object o) {
Command command = (Command) o;
Response response = service(command);
if (response != null) {
dispatchSync(response);
}
}
public void onException(IOException exception) {
serviceTransportException(exception);
}
});
So, any command serviced returning a response, forces the transport
listener to write to the socket, in the dispatchSync call. To do so, it
will try to lock the MutexTransport, if in this very moment, the socket
buffer is getting full, and some of the consumer threads is holding the
MutexTransport, the deadlock will happen (also, the Transport thread
could fall into the deadlock if its write attempt fills the buffer).
There's no way to recover from this situation, since the only thread
that could read from the socket is trying to get the MutexTransport
lock, and the thread holding it will never release it until its
socketWrite0 call finishes. Since nobody is reading, this will never happen.
Agree with this explanation. Did I miss something?
Is this any better in the 5.x series?
Do you think that passing a TaskRunnerFactory in the TransportConnection
constructor and changing that call from dispatchSync to dispatchAsync
could avoid the deadlock?
Is there any drawback to this approach ?
Thanks for your time. Please, any feedback will be very appreciated,
since the problem is stopping our production systems. Once it happens,
the consumers on the problematic connection get stuck forever.
Best regards.
Extra bonus, stack traces. A Transport Thread stuck in socketWrite0.
Nobody could write on the socket, and it won't be able to read, since
it's locked writing:
"ActiveMQ Transport: tcp:///127.0.0.1:17891" daemon prio=10
tid=0x00c4af30 nid=0x48 runnable [0x2dcff000..0x2dcff9f0]
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:109)
at java.io.DataOutputStream.flush(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:119)
at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:145)
at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:80)
at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:93)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:47)
- locked <0x3d5625c0> (a java.lang.Object)
at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1138)
at
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:805)
at
org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:770)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:187)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:65)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:133)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:124)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:137)
at java.lang.Thread.run(Unknown Source)