First of all, sorry for being so boring about this issue, but I'm really in need of some help. After some findings, this is where I got:

-Given the fact that sockets get locked on write operations, I tried to detach those operations from the "ActiveMQ Transport: tcp://blablabla" threads, and hopefully, avoiding those threads to lock in a socketWrite operation. -To achieve that, I just declared a per-instance Executor as Executors.newSingleThreadExecutor. That way, commands should be processed in order. So, in the TcpTransport run() method, I have something like:

while (!isStopped()) {
   doRun();
}

And in the doRun method, omitting all the exception handling:

final Object command = readcommand();
if (consumerExecutor != null) {
   consumerExecutor.execute(new Runnable() {
       public void run() {
           doConsume(command);
       }
   });
} else {
   doConsume(command);
}

So, when the member instance consumerExecutor is null, everything works as expected, in the non-detached current fashion. However, defining consumerExecutor leads to a lot of wireformat errors. For example, trying to connect from a client using the original activemq libraries:

[DEBUG][2008-09-22 17:59:06,363][ActiveMQ Transport Server: tcp://localhost:61666][org.apache.activemq.transport.tcp.TcpTransport] Creating TcpTransport(SO_SNDBUF: 49152), SO_RCVBUF: 57344) [DEBUG][2008-09-22 17:59:06,405][ActiveMQ Transport Server: tcp://localhost:61666][org.apache.activemq.transport.WireFormatNegotiator] Sending: WireFormatInfo { version=2, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} [DEBUG][2008-09-22 17:59:06,454][Tcp Transport Consumer][org.apache.activemq.transport.tcp.TcpTransport] Consuming command WireFormatInfo { version=2, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} [DEBUG][2008-09-22 17:59:06,549][ActiveMQ Transport: tcp:///127.0.0.1:61078][org.apache.activemq.broker.TransportConnection.Transport] Transport failed: java.io.IOException: Unknown data type: 115
java.io.IOException: Unknown data type: 115
at org.apache.activemq.openwire.OpenWireFormat.looseUnmarshalNestedObject(OpenWireFormat.java:443) at org.apache.activemq.openwire.v2.BaseDataStreamMarshaller.looseUnmarsalNestedObject(BaseDataStreamMarshaller.java:436) at org.apache.activemq.openwire.v2.ConnectionInfoMarshaller.looseUnmarshal(ConnectionInfoMarshaller.java:154) at org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:349) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:273) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:196) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:175) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:156)
       at java.lang.Thread.run(Unknown Source)
[DEBUG][2008-09-22 17:59:06,580][ActiveMQ Transport: tcp:///127.0.0.1:61078][org.apache.activemq.broker.TransportConnection] Stopping connection: /127.0.0.1:61078 [DEBUG][2008-09-22 17:59:06,584][ActiveMQ Transport: tcp:///127.0.0.1:61078][org.apache.activemq.broker.TransportConnection] Stopped connection: /127.0.0.1:61078 [DEBUG][2008-09-22 17:59:06,577][Tcp Transport Consumer][org.apache.activemq.transport.WireFormatNegotiator] Received WireFormat: WireFormatInfo { version=2, properties={TightEncodingEnabled=true, CacheSize=1024, TcpNoDelayEnabled=true, SizePrefixDisabled=false, StackTraceEnabled=true, MaxInactivityDuration=30000, CacheEnabled=true}, magic=[A,c,t,i,v,e,M,Q]} [DEBUG][2008-09-22 17:59:06,585][Tcp Transport Consumer][org.apache.activemq.transport.WireFormatNegotiator] tcp:///127.0.0.1:61078 before negotiation: OpenWireFormat{version=2, cacheEnabled=false, stackTraceEnabled=false, tightEncodingEnabled=false, sizePrefixDisabled=false} [DEBUG][2008-09-22 17:59:06,585][Tcp Transport Consumer][org.apache.activemq.transport.WireFormatNegotiator] tcp:///127.0.0.1:61078 after negotiation: OpenWireFormat{version=2, cacheEnabled=true, stackTraceEnabled=true, tightEncodingEnabled=true, sizePrefixDisabled=false}


Where "Tcp Transport Consumer" is the name of the threads of the consumerExecutor executor.

Similar effects are seen in the client side. Just a different data type:

Exception in thread "main" javax.jms.JMSException: Unknown data type: 68
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:46) at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1181) at org.apache.activemq.ActiveMQConnection.ensureConnectionInfoSent(ActiveMQConnection.java:1263) at org.apache.activemq.ActiveMQConnection.start(ActiveMQConnection.java:449)
       at test.JMSThreads.main(JMSTest.java:73)
Caused by: java.io.IOException: Unknown data type: 68
at org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:342) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:273) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:156) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:136)
       at java.lang.Thread.run(Unknown Source)



So, it seems that processing commands in a detached fashion is not possible. But, why? To demonstrate it, I've just set the following code in the doRun(), launched from the run() TcpTransport method:

   protected void doRun() throws IOException {
       try {
           final Object command = readCommand();
           log.debug("READ A SECOND COMMAND");
           Object command2 = readCommand();

Trying to read a second command from the data stream, leads to the same previous result:

[DEBUG][2008-09-22 18:21:05,420][ActiveMQ Transport: tcp:///127.0.0.1:61172][org.apache.activemq.transport.tcp.TcpTransport] READ A SECOND COMMAND [DEBUG][2008-09-22 18:21:05,467][ActiveMQ Transport: tcp:///127.0.0.1:61172][org.apache.activemq.broker.TransportConnection.Transport] Transport failed: java.io.IOException: Unknown data type: 115
java.io.IOException: Unknown data type: 115
at org.apache.activemq.openwire.OpenWireFormat.looseUnmarshalNestedObject(OpenWireFormat.java:443) at org.apache.activemq.openwire.v2.BaseDataStreamMarshaller.looseUnmarsalNestedObject(BaseDataStreamMarshaller.java:436) at org.apache.activemq.openwire.v2.ConnectionInfoMarshaller.looseUnmarshal(ConnectionInfoMarshaller.java:154) at org.apache.activemq.openwire.OpenWireFormat.doUnmarshal(OpenWireFormat.java:349) at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:273) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:198) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:177) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:156)
       at java.lang.Thread.run(Unknown Source)


What is actually reading from the stream? Shouldn't it wait for the next command to be available?
I'm a little lost with this, so please, shred some light into this problem.


Desperately,

Manuel.



Manuel Teira Paz escribió:
Filip Hanik - Dev Lists escribió:
it might help setting transport.soTimeout and that should cause the
socketWrite to back out, if it gets stuck.

AFAIK, soTimeout is only valid for read timeouts, no write ones. At
least that's what I've understood from the java API docs.

what you're explaining is a common problem with many AMQ installations,
and yes, the correct way to fix it would be to remove the locks that are
in the stack trace


Those locks are actually used to avoid using the transport at the same
time from different threads/sessions involved in a given connection. I
don't think they're the problem.

The problem is IMHO the Tcp Transport threads being writing acks, hence
if they get locked into those write attempts, nobody is going to read
anymore from the socket, and hence, will finish with more threads locked
trying to write into that socket.

I've tried to separate the doConsume into a different thread. An easy
change, but it doesn't work. I don't know why, but subsequent
readCommand attempts failed with EOF exceptions. I'm not able to see
what's the difference from the point of view of the readCommand call
into having the doConsume working into the same or a different thread.
Any hint ?

In the meantime and as a temporary worarround, I'm going to increase the
send and receive buffers of the socket, trying to avoid the effect.


Regards

Manuel.

Filip

Manuel Teira Paz wrote:

Filip Hanik - Dev Lists escribió:

hi Manuel,
I may not be understanding your theory completely, but if I do, I'd have
to disagree with parts of your assessment,

the problem you describe doesn't really have anything to do with
blocking vs non blocking IO. instead its the implementation on top of
the socket API.

taking a simple java program, you can read and write from blocking
sockets simultaneously.


Hello Filip, and thanks for your comments. Actually, yes, you must be
able to read and write simultaneously on a given socket. Sorry for
being to clear enough in my exposition. I've taken a deeper look into
the details of the problem and the actual problem is that all my
consumer threads get locked, one trying to write in the socket, and
the others trying to adquire the MutexTransport in the transport
filter chain. What they are actually trying to do is to ack some
already sent messages. The stack looks like this:

"Session(recv,TaskManagerQueue)#56" prio=10 tid=0x00b4a6d8 nid=0x6a
runnable [0xeb9fe000..0xeb9ffaa8]at
java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:109)

at java.io.DataOutputStream.flush(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:119)

at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:145)

at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:80)

at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:93)

at
org.apache.activemq.transport.failover.FailoverTransport.oneway(FailoverTransport.java:392)

- locked <0x2abab308> (a java.lang.Object)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:47)

- locked <0x2aba90f0> (a java.lang.Object)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)

at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1155)

at
org.apache.activemq.TransactionContext.begin(TransactionContext.java:201)
at
org.apache.activemq.ActiveMQSession.doStartTransaction(ActiveMQSession.java:1564)

at
org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:699)

at
org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:651)

at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:487)


That is the locker, and never goes out of
java.net.SocketOutputStream.socketWrite0. What I understand from that
stack is that is trying to ack some messages before actually consuming
the message, so, sending through the socket is involved. The thread is
runnable, but since the socket buffer is full, is not able to continue.

The other consumers sharing that connection show this stack:

waiting for monitor entry [0xec0ff000..0xec0ffa28]
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:46)

- waiting to lock <0x2aba90f0> (a java.lang.Object)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)

at
org.apache.activemq.ActiveMQConnection.asyncSendPacket(ActiveMQConnection.java:1155)

at
org.apache.activemq.TransactionContext.begin(TransactionContext.java:201)
at
org.apache.activemq.ActiveMQSession.doStartTransaction(ActiveMQSession.java:1564)

at
org.apache.activemq.ActiveMQMessageConsumer.ackLater(ActiveMQMessageConsumer.java:699)

at
org.apache.activemq.ActiveMQMessageConsumer.beforeMessageIsConsumed(ActiveMQMessageConsumer.java:651)

at
org.apache.activemq.ActiveMQMessageConsumer.receive(ActiveMQMessageConsumer.java:487)


In this situation, nobody is able to consume from the socket, since
all the consumers are locked trying to ack some messages. So, it seems
to me a implicit deadlock.

On the other way, ActiveMQ Transport, (I think they are responsible
for reading stuff from the socket)  threads seems to be stuck in a
similar situation (this is from a different stack dump):

"ActiveMQ Transport: tcp:///127.0.0.1:25047" daemon prio=10
tid=0x003764c0 nid=0x30 runnable [0xef3fe000..0xef3ffca8]
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(Unknown Source)
at java.net.SocketOutputStream.write(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:109)

at java.io.DataOutputStream.flush(Unknown Source)
at
org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:119)

at
org.apache.activemq.transport.InactivityMonitor.oneway(InactivityMonitor.java:145)

at
org.apache.activemq.transport.TransportFilter.oneway(TransportFilter.java:80)

at
org.apache.activemq.transport.WireFormatNegotiator.oneway(WireFormatNegotiator.java:93)

at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:47)

- locked <0x2ab72b20> (a java.lang.Object)
at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1138)

at
org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:805)

at
org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:770)

at
org.apache.activemq.broker.region.PrefetchSubscription.dispatch(PrefetchSubscription.java:404)

at
org.apache.activemq.broker.region.QueueSubscription.dispatch(QueueSubscription.java:172)

at
org.apache.activemq.broker.region.PrefetchSubscription.dispatchMatched(PrefetchSubscription.java:369)

at
org.apache.activemq.broker.region.PrefetchSubscription.acknowledge(PrefetchSubscription.java:204)

- locked <0x2af14b48> (a
org.apache.activemq.broker.region.QueueSubscription)
at
org.apache.activemq.broker.region.AbstractRegion.acknowledge(AbstractRegion.java:299)

at
org.apache.activemq.broker.region.RegionBroker.acknowledge(RegionBroker.java:402)

at
org.apache.activemq.broker.TransactionBroker.acknowledge(TransactionBroker.java:177)

at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
at
org.apache.activemq.broker.BrokerFilter.acknowledge(BrokerFilter.java:74)
at
org.apache.activemq.broker.MutableBrokerFilter.acknowledge(MutableBrokerFilter.java:88)

at
org.apache.activemq.broker.TransportConnection.processMessageAck(TransportConnection.java:506)

at org.apache.activemq.command.MessageAck.visit(MessageAck.java:179)
at
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)

at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:185)

at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:65)

at
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:133)

at
org.apache.activemq.transport.InactivityMonitor.onCommand(InactivityMonitor.java:124)

at
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:84)

at
org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:137)
at java.lang.Thread.run(Unknown Source)


So, will non-blocking IO be a solution for this? After a deeper look,
I'm afraid it won't. The only actual difference from this point of
view is that transport.nio.NIOOutputStream.flush() will write the data
in chunks, as big as the socket allows, but it won't give up until all
the data get written. We won't be stuck in the socketWrite0 call,
instead of that, the "while (remaining > 0)" loop in NIOOutputStream,
will be repeated forever, and since the other consumers sharing the
connection are also trying to write into the socket to ack some
message, will be no chance to free the socket buffer.

If I'm not mistaken, since both "ActiveMQ Transport" and "Session"
threads need to write into the socket, this could lead to the
situation where everybody is trying to do so, finishing with some
threads locking into the socketWrite0 call, and some others waiting
for the MutexTransport writeLock to proceed.

After this new review, thanks to Filip, I'm afraid that NIO is not
going to be a solution (unfortunately I was not able to reproduce the
hang in our labs). So, the questions are:

-Do you agree with this new analysis?
-Is this design still present in the 5.x releases?

I'm not sure whether using different connections for consumers and
producers would be a valid workaround. Perhaps it will just mitigate
the situation, as more sockets get involved and so, more effective
buffer space gets used, but I would like to be sure that this is not
going to happen, since it leads to service unavailability situations
in the product.

Best regards and thanks a lot for your feedback. Looking forward for
more. :)

-
Manuel.







Reply via email to