[ https://issues.apache.org/jira/browse/AMQ-3605?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13167478#comment-13167478 ]
Tommy Lindgren edited comment on AMQ-3605 at 12/12/11 2:32 PM: --------------------------------------------------------------- I've done some testing against 5.6-SNAPSHOT (as of 2011-11-23). I don't see the NPE anymore but clients closing the socket in an unclean fashion seems to still be a problem. It looks like my affected client now hangs on the SEND call. I can reproduce this problem within minutes by stressing my clients while a script repeatedly forks, connects to AMQ, subscribes to a topic and then calls _exit() (to make sure the socket isn't closed by the garbage collector). The resulting AMQ log is 200MB+ and may contain sensitive information, so I can't attach it as it is. Just before the client hangs I see (log excerpts slightly edited, message bodies and headers are removed): {noformat} 2011-12-12 10:56:21,776 | TRACE | Received: SEND [...] 2011-12-12 10:56:21,776 | DEBUG | message is ActiveMQTextMessage {commandId = 2262, responseRequired = false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1: 2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1, destination = topic://change.TRANSFER, trans actionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 1323683781775, correlationId = null, replyTo = n ull, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]}, readOnlyProperties = true, readO nlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?> <atom:...atom:entry> } 2011-12-12 10:56:21,776 | DEBUG | Message available, but continuation is already resumed. Buffer for next time. 2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.io.EOFException java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.activemq.transport.stomp.StompWireFormat.readHeaderLine(StompWireFormat.java:155) at org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:148) at org.apache.activemq.transport.stomp.StompWireFormat.parseAction(StompWireFormat.java:170) at org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:98) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:229) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:221) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204) at java.lang.Thread.run(Thread.java:662) 2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:58:1:1, started=true } continuation=org.eclipse.jetty.s erver.AsyncContinuation@e9b412d@IDLE,initial 2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:57:1:1, started=true } continuation=org.eclipse.jetty.s erver.AsyncContinuation@50afeae@IDLE,initial 2011-12-12 10:56:21,776 | TRACE | org.apache.activemq.broker.region.cursors.QueueStorePrefetch@73a2335d:daemon.archived,batchResetNeeded=false,storeHasMessages=true,si ze=0,cacheEnabled=true - fillBatch 2011-12-12 10:56:21,777 | TRACE | ID:dev01-54928-1323379526715-8:58:1:1 received message: MessageDispatch {commandId = 391, responseRequired = false, consumerId = I D:dev01-54928-1323379526715-8:58:1:1, destination = topic://change.TRANSFER, message = ActiveMQTextMessage {commandId = 2262, responseRequired = false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1:2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1, destination = topic://change.TRANSFER, transactionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 132 3683781775, correlationId = null, replyTo = null, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence = 0, targetConsumer Id = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?> <atom:...atom:entry> }, redeliveryCounter = 0} 2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.net.SocketException: Broken pipe java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) at java.io.DataOutputStream.flush(DataOutputStream.java:106) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:184) at org.apache.activemq.transport.stomp.StompTransportFilter.sendToStomp(StompTransportFilter.java:97) at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:99) at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:746) at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:64) at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:262) at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1281) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:830) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:866) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) {noformat} The "Received: SEND" message is from my affected client. I see a bunch of such messages after the SocketException before the client hangs two seconds later. I assume the client hangs because some buffer at some level is full. I can't see anything obvious explaining why AMQ stops processing the client's messages. Occasionally I also see the following exception: {noformat} java.lang.IllegalStateException: Cannot lookup a producer from a connection that had not been registered: ID:dev01-54928-1323379526715-4:59 at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:93) at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1438) at org.apache.activemq.broker.TransportConnection.getProducerBrokerExchange(TransportConnection.java:1314) at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:468) at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45) at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229) at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:87) at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:166) at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:292) at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:204) at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:76) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204) {noformat} This exception is triggered more often when I do the stress test but sometimes it doesn't appear to have been thrown at all when the client has hanged. Not sure how useful these log excerpts are. I haven't tested the attached patch yet. If you think it will help I can try to build AMQ myself. was (Author: tomyl): I've done some testing against 5.6-SNAPSHOT (as of 2011-11-23). I don't see the NPE anymore but clients closing the socket in an unclean fashion seems to still be a problem. It looks like my affected client now hangs on the SEND call. I can reproduce this problem within minutes by stressing my clients while a script repeatedly forks, connects to AMQ, subscribes to a topic and then calls _exit() (to make sure the socket isn't closed by the garbage collector). The resulting AMQ log is 200MB+ and may contain sensitive information, so I can't attach it as it is. Just before the client hangs I see (log excerpts slightly edited): {noformat} 2011-12-12 10:56:21,776 | TRACE | Received: SEND [...] 2011-12-12 10:56:21,776 | DEBUG | message is ActiveMQTextMessage {commandId = 2262, responseRequired = false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1: 2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1, destination = topic://change.TRANSFER, trans actionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 1323683781775, correlationId = null, replyTo = n ull, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]}, readOnlyProperties = true, readO nlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?> <atom:...atom:entry> } 2011-12-12 10:56:21,776 | DEBUG | Message available, but continuation is already resumed. Buffer for next time. 2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.io.EOFException java.io.EOFException at java.io.DataInputStream.readByte(DataInputStream.java:250) at org.apache.activemq.transport.stomp.StompWireFormat.readHeaderLine(StompWireFormat.java:155) at org.apache.activemq.transport.stomp.StompWireFormat.readLine(StompWireFormat.java:148) at org.apache.activemq.transport.stomp.StompWireFormat.parseAction(StompWireFormat.java:170) at org.apache.activemq.transport.stomp.StompWireFormat.unmarshal(StompWireFormat.java:98) at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:229) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:221) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204) at java.lang.Thread.run(Thread.java:662) 2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:58:1:1, started=true } continuation=org.eclipse.jetty.s erver.AsyncContinuation@e9b412d@IDLE,initial 2011-12-12 10:56:21,776 | DEBUG | message for ActiveMQMessageConsumer { value=ID:dev01-54928-1323379526715-8:57:1:1, started=true } continuation=org.eclipse.jetty.s erver.AsyncContinuation@50afeae@IDLE,initial 2011-12-12 10:56:21,776 | TRACE | org.apache.activemq.broker.region.cursors.QueueStorePrefetch@73a2335d:daemon.archived,batchResetNeeded=false,storeHasMessages=true,si ze=0,cacheEnabled=true - fillBatch 2011-12-12 10:56:21,777 | TRACE | ID:dev01-54928-1323379526715-8:58:1:1 received message: MessageDispatch {commandId = 391, responseRequired = false, consumerId = I D:dev01-54928-1323379526715-8:58:1:1, destination = topic://change.TRANSFER, message = ActiveMQTextMessage {commandId = 2262, responseRequired = false, messageId = ID:dev01-54928-1323379526715-4:15980:-1:1:2254, originalDestination = null, originalTransactionId = null, producerId = ID:dev01-54928-1323379526715-4:15980:-1:1, destination = topic://change.TRANSFER, transactionId = null, expiration = 0, timestamp = 1323683781775, arrival = 0, brokerInTime = 1323683781775, brokerOutTime = 132 3683781775, correlationId = null, replyTo = null, persistent = false, type = EVENT.ResourceChangeEvent, priority = 4, groupID = null, groupSequence = 0, targetConsumer Id = null, compressed = false, userID = null, content = null, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 1507, properties = {[...]}, readOnlyProperties = true, readOnlyBody = true, droppable = false, text = <?xml version="1.0" encoding="UTF-8"?> <atom:...atom:entry> }, redeliveryCounter = 0} 2011-12-12 10:56:21,776 | DEBUG | Transport failed: java.net.SocketException: Broken pipe java.net.SocketException: Broken pipe at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:92) at java.net.SocketOutputStream.write(SocketOutputStream.java:136) at org.apache.activemq.transport.tcp.TcpBufferedOutputStream.flush(TcpBufferedOutputStream.java:115) at java.io.DataOutputStream.flush(DataOutputStream.java:106) at org.apache.activemq.transport.tcp.TcpTransport.oneway(TcpTransport.java:184) at org.apache.activemq.transport.stomp.StompTransportFilter.sendToStomp(StompTransportFilter.java:97) at org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:99) at org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:746) at org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:64) at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:262) at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:244) at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68) at org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1281) at org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:830) at org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:866) at org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:98) at org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:36) {noformat} The "Received: SEND" message is from my affected client. I see a bunch of such messages after the SocketException before the client hangs two seconds later. I assume the client hangs because some buffer at some level is full. I can't see anything obvious explaining why AMQ stops processing the client's messages. Occasionally I also see the following exception: {noformat} java.lang.IllegalStateException: Cannot lookup a producer from a connection that had not been registered: ID:dev01-54928-1323379526715-4:59 at org.apache.activemq.broker.SingleTransportConnectionStateRegister.lookupConnectionState(SingleTransportConnectionStateRegister.java:93) at org.apache.activemq.broker.TransportConnection.lookupConnectionState(TransportConnection.java:1438) at org.apache.activemq.broker.TransportConnection.getProducerBrokerExchange(TransportConnection.java:1314) at org.apache.activemq.broker.TransportConnection.processMessage(TransportConnection.java:468) at org.apache.activemq.command.ActiveMQMessage.visit(ActiveMQMessage.java:681) at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:318) at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:181) at org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:45) at org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:229) at org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:87) at org.apache.activemq.transport.stomp.ProtocolConverter.sendToActiveMQ(ProtocolConverter.java:166) at org.apache.activemq.transport.stomp.ProtocolConverter.onStompSend(ProtocolConverter.java:292) at org.apache.activemq.transport.stomp.ProtocolConverter.onStompCommand(ProtocolConverter.java:204) at org.apache.activemq.transport.stomp.StompTransportFilter.onCommand(StompTransportFilter.java:76) at org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83) at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:222) at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:204) {noformat} This exception is triggered more often when I do the stress test but sometimes it doesn't appear to have been thrown at all when the client has hanged. Not sure how useful these log excerpts are. I haven't tested the attached patch yet. If you think it will help I can try to build AMQ myself. > NullPointerException in TransportConnection > ------------------------------------------- > > Key: AMQ-3605 > URL: https://issues.apache.org/jira/browse/AMQ-3605 > Project: ActiveMQ > Issue Type: Bug > Components: Transport > Affects Versions: 5.5.0 > Environment: SLES 11 SP1 > java version "1.6.0_23" > perl 5.10.0 + Net::Stomp 0.38_99 > Reporter: Tommy Lindgren > Priority: Critical > Labels: stomp > Attachments: AMQ-3605-Patch.txt, StompDedicatedTaskRunnerTests.java, > StompDedicatedTaskRunnerTests.java > > > I'm running ActiveMQ 5.5.0 and clients using Net::Stomp 0.38_99 and I'm > seeing infrequent NullPointerExceptions in TransportConnection: > {noformat} > Exception in thread "ActiveMQ Connection Dispatcher: /172.31.201.11:50607" > java.lang.NullPointerException > at > org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:327) > at > org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:179) > at > org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:69) > at > org.apache.activemq.transport.stomp.StompTransportFilter.sendToActiveMQ(StompTransportFilter.java:81) > at > org.apache.activemq.transport.stomp.StompSubscription.onMessageDispatch(StompSubscription.java:79) > at > org.apache.activemq.transport.stomp.ProtocolConverter.onActiveMQCommand(ProtocolConverter.java:596) > at > org.apache.activemq.transport.stomp.StompTransportFilter.oneway(StompTransportFilter.java:58) > at > org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:40) > at > org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1270) > at > org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:815) > at > org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:851) > at > org.apache.activemq.thread.DedicatedTaskRunner.runTask(DedicatedTaskRunner.java:104) > at > org.apache.activemq.thread.DedicatedTaskRunner$1.run(DedicatedTaskRunner.java:42) > {noformat} > This seems to happen 1-2 times per month or so but the result is dire: new > messages aren't delivered to the affected client (you can see the number of > pending messages increasing in the admin web interface) until the client or > ActiveMQ is restarted. > Relevant code snippet from TransportConnection.java, > {noformat} > 326 if (context != null) { > 327 if (context.isDontSendReponse()) { > {noformat} > implies that we are dealing with a race condition. I'm not familiar with the > ActiveMQ code base but it looks like it grabs a lock (serviceLock) before > entering that function, so not sure what's going on. > Since there's no timestamp associated with the stack trace I'm not completly > sure what's going on on the client side. I've tried to reproduce it by > writing a small script that uses Net::Stomp in a similar way to my real > clients, but no luck so far. > No idea if it's relevant, but my affected clients have been both consuming > and producing, and sending/receiving on both topics and queues. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators: https://issues.apache.org/jira/secure/ContactAdministrators!default.jspa For more information on JIRA, see: http://www.atlassian.com/software/jira