Filip Hanik - Dev Lists escribió:
hi Manuel,
I may not be understanding your theory completely, but if I do, I'd have
to disagree with parts of your assessment,
the problem you describe doesn't really have anything to do with
blocking vs non blocking IO. instead its the implementation on top of
the socket API.
taking a simple java program, you can read and write from blocking
sockets simultaneously.
Hello Filip, and thanks for your comments. Actually, yes, you must be
able to read and write simultaneously on a given socket. Sorry for being
to clear enough in my exposition. I've taken a deeper look into the
details of the problem and the actual problem is that all my consumer
threads get locked, one trying to write in the socket, and the others
trying to adquire the MutexTransport in the transport filter chain. What
they are actually trying to do is to ack some already sent messages. The
stack looks like this:
"Session(recv,TaskManagerQueue)#56" prio=10 tid=0x00b4a6d8 nid=0x6a
runnable [0xeb9fe000..0xeb9ffaa8]at
java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:109)
at java.io.DataOutputStream.flush(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:119)
at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:145)
at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:80)
at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:93)
at
org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:392)
- locked <0x2abab308> (a java.lang.Object)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:47)
- locked <0x2aba90f0> (a java.lang.Object)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1155)
at org.apache.activemq.TransactionContext.begin(TransactionContext.java:201)
at
org.apache.activemq.ActiveMQSession.doStartTransaction(ActiveMQSession.java:1564)
at
org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:699)
at
org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:651)
at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:487)
That is the locker, and never goes out of
java.net.SocketOutputStream.socketWrite0. What I understand from that
stack is that is trying to ack some messages before actually consuming
the message, so, sending through the socket is involved. The thread is
runnable, but since the socket buffer is full, is not able to continue.
The other consumers sharing that connection show this stack:
waiting for monitor entry [0xec0ff000..0xec0ffa28]
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:46)
- waiting to lock <0x2aba90f0> (a java.lang.Object)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1155)
at org.apache.activemq.TransactionContext.begin(TransactionContext.java:201)
at
org.apache.activemq.ActiveMQSession.doStartTransaction(ActiveMQSession.java:1564)
at
org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:699)
at
org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:651)
at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:487)
In this situation, nobody is able to consume from the socket, since all
the consumers are locked trying to ack some messages. So, it seems to me
a implicit deadlock.
On the other way, ActiveMQ Transport, (I think they are responsible for
reading stuff from the socket) threads seems to be stuck in a similar
situation (this is from a different stack dump):
"ActiveMQ Transport: tcp:///127.0.0.1:25047" daemon prio=10
tid=0x003764c0 nid=0x30 runnable [0xef3fe000..0xef3ffca8]
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:109)
at java.io.DataOutputStream.flush(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:119)
at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:145)
at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:80)
at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:93)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:47)
- locked <0x2ab72b20> (a java.lang.Object)
at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1138)
at
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:805)
at
org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:770)
at
org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:404)
at
org.apache.activemq.broker.region.QueueSubscription.dispatch(QueueSubscription.java:172)
at
org.apache.activemq.broker.region.PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:369)
at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:204)
- locked <0x2af14b48> (a
org.apache.activemq.broker.region.QueueSubscription)
at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:299)
at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:402)
at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:177)
at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
at org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:88)
at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:506)
at org.apache.activemq.command.MessageAck.visit(MessageAck.java:179)
at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:65)
at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:133)
at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:124)
at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:137)
at java.lang.Thread.run(Unknown Source)
So, will non-blocking IO be a solution for this? After a deeper look,
I'm afraid it won't. The only actual difference from this point of view
is that transport.nio.NIOOutputStream.flush() will write the data in
chunks, as big as the socket allows, but it won't give up until all the
data get written. We won't be stuck in the socketWrite0 call, instead of
that, the "while (remaining > 0)" loop in NIOOutputStream, will be
repeated forever, and since the other consumers sharing the connection
are also trying to write into the socket to ack some message, will be no
chance to free the socket buffer.
If I'm not mistaken, since both "ActiveMQ Transport" and "Session"
threads need to write into the socket, this could lead to the situation
where everybody is trying to do so, finishing with some threads locking
into the socketWrite0 call, and some others waiting for the
MutexTransport writeLock to proceed.
After this new review, thanks to Filip, I'm afraid that NIO is not going
to be a solution (unfortunately I was not able to reproduce the hang in
our labs). So, the questions are:
-Do you agree with this new analysis?
-Is this design still present in the 5.x releases?
I'm not sure whether using different connections for consumers and
producers would be a valid workaround. Perhaps it will just mitigate the
situation, as more sockets get involved and so, more effective buffer
space gets used, but I would like to be sure that this is not going to
happen, since it leads to service unavailability situations in the product.
Best regards and thanks a lot for your feedback. Looking forward for
more. :)
-
Manuel.