i am using activemq-4.0.2. i am running a test with 6 producers
continously
sending stomp message to a queue, with 6 consumers on the same
queue. if i
kill -9 all the consumers (which are all in one process), and
restart the
consumers again, sometimes (about 30% of the time) the consumers
cannot get
any messages anymore. if i inspect the broker, i see only one
consumer
registered.
after some digging myself, i found out the reason is that
org.apache.activemq.broker.region.Queue.addSubscription is
blocked at
dispatchValve.turnOff()
the reason for that is when the previous consumers are killed, on
the broker
side, in Queue.removeSubscription, dispatchValve.turnOn() (line
241) is
not invoked. a little bit more poking reveals that in all the
failed cases,
line 233: dispatchPolicy.dispatch(context, node, msgContext,
consumers)
invokes removeSubscription again, therefore causing the deadlock
(i.e. the
thread that's supposed to call dispatchValve.turnOn again calls
removeSubscription which calls dispatchValve.turnOff which is
blocked)
the 2nd "self" call is preceded by this transport exception in the
log:
(probably because i killed the consumers)
2007-01-25 13:29:46,662 [127.0.0.1:56489] DEBUG Transport
- Transport failed: java.io.IOException: The transport
tcp:///127.0.0.1:56486 of type:
org.apache.activemq.transport.tcp.TcpTransport is not running.
java.io.IOException: The transport tcp:///127.0.0.1:56486 of type:
org.apache.activemq.transport.tcp.TcpTransport is not running.
at
org.apache.activemq.transport.TransportSupport.checkStarted
(TransportSupport.java:109)
at
org.apache.activemq.transport.tcp.TcpTransport.oneway
(TcpTransport.java:117)
at
org.apache.activemq.transport.InactivityMonitor.oneway
(InactivityMonitor.java:142)
at
org.apache.activemq.transport.TransportFilter.oneway
(TransportFilter.java:82)
at
org.apache.activemq.transport.WireFormatNegotiator.oneway
(WireFormatNegotiator.java:87)
at
org.apache.activemq.transport.MutexTransport.oneway
(MutexTransport.java:45)
at
org.apache.activemq.transport.ResponseCorrelator.oneway
(ResponseCorrelator.java:59)
at
org.apache.activemq.broker.TransportConnection.dispatch
(TransportConnection.java:215)
at
org.apache.activemq.broker.AbstractConnection.processDispatch
(AbstractConnection.java:722)
at
org.apache.activemq.broker.AbstractConnection.dispatchSync
(AbstractConnection.java:699)
at
org.apache.activemq.broker.region.PrefetchSubscription.dispatch
(PrefetchSubscription.java:311)
at
org.apache.activemq.broker.region.QueueSubscription.dispatch
(QueueSubscription.java:152)
at
org.apache.activemq.broker.region.PrefetchSubscription.add
(PrefetchSubscription.java:68)
at
org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.di
sp
atch(RoundRobinDispatchPolicy.java:54)
at
org.apache.activemq.broker.region.Queue.removeSubscription
(Queue.java:233)
at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer
(AbstractRegion.java:208)
at
org.apache.activemq.broker.region.RegionBroker.removeConsumer
(RegionBroker.java:317)
at
org.apache.activemq.broker.BrokerFilter.removeConsumer
(BrokerFilter.java:102)
at
org.apache.activemq.advisory.AdvisoryBroker.removeConsumer
(AdvisoryBroker.java:210)
at
org.apache.activemq.broker.BrokerFilter.removeConsumer
(BrokerFilter.java:102)
at
org.apache.activemq.broker.MutableBrokerFilter.removeConsumer
(MutableBrokerFilter.java:115)
at
org.apache.activemq.broker.AbstractConnection.processRemoveConsumer
(AbstractConnection.java:553)
at
org.apache.activemq.broker.AbstractConnection.processRemoveSession
(AbstractConnection.java:590)
at
org.apache.activemq.broker.AbstractConnection.processRemoveConnectio
n(
AbstractConnection.java:660)
at
org.apache.activemq.broker.AbstractConnection.stop
(AbstractConnection.java:169)
at
org.apache.activemq.broker.TransportConnection.stop
(TransportConnection.java:98)
at
org.apache.activemq.broker.jmx.ManagedTransportConnection.stop
(ManagedTransportConnection.java:63)
at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:
40)
at
org.apache.activemq.broker.AbstractConnection.serviceTransportExcept
io
n(AbstractConnection.java:183)
at
org.apache.activemq.broker.TransportConnection$1.onException
(TransportConnection.java:67)
at
org.apache.activemq.transport.TransportFilter.onException
(TransportFilter.java:98)
at
org.apache.activemq.transport.ResponseCorrelator.onException
(ResponseCorrelator.java:111)
at
org.apache.activemq.transport.TransportFilter.onException
(TransportFilter.java:98)
at
org.apache.activemq.transport.TransportFilter.onException
(TransportFilter.java:98)
at
org.apache.activemq.transport.WireFormatNegotiator.onException
(WireFormatNegotiator.java:130)
at
org.apache.activemq.transport.InactivityMonitor.onException
(InactivityMonitor.java:150)
at
org.apache.activemq.transport.TransportSupport.onException
(TransportSupport.java:101)
at
org.apache.activemq.transport.tcp.TcpTransport.run
(TcpTransport.java:150)
at java.lang.Thread.run(Thread.java:595)
and i print out the stack when the "self" call to
removeSubscription is
happening:
at
org.apache.activemq.broker.region.Queue.removeSubscription
(Queue.java:181)
at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer
(AbstractRegion.java:208)
at
org.apache.activemq.broker.region.RegionBroker.removeConsumer
(RegionBroker.java:317)
at
org.apache.activemq.broker.BrokerFilter.removeConsumer
(BrokerFilter.java:102)
at
org.apache.activemq.advisory.AdvisoryBroker.removeConsumer
(AdvisoryBroker.java:210)
at
org.apache.activemq.broker.BrokerFilter.removeConsumer
(BrokerFilter.java:102)
at
org.apache.activemq.broker.MutableBrokerFilter.removeConsumer
(MutableBrokerFilter.java:115)
at
org.apache.activemq.broker.AbstractConnection.processRemoveConsumer
(AbstractConnection.java:553)
at
org.apache.activemq.broker.AbstractConnection.processRemoveSession
(AbstractConnection.java:590)
at
org.apache.activemq.broker.AbstractConnection.processRemoveConnectio
n(
AbstractConnection.java:660)
at
org.apache.activemq.broker.AbstractConnection.stop
(AbstractConnection.java:169)
at
org.apache.activemq.broker.TransportConnection.stop
(TransportConnection.java:98)
at
org.apache.activemq.broker.jmx.ManagedTransportConnection.stop
(ManagedTransportConnection.java:
63)
at
org.apache.activemq.util.ServiceSupport.dispose(ServiceSupport.java:
40)
at
org.apache.activemq.broker.AbstractConnection.serviceTransportExcept
io
n(AbstractConnection.java:183)
at
org.apache.activemq.broker.AbstractConnection.serviceException
(AbstractConnection.java:191)
at
org.apache.activemq.broker.TransportConnection.dispatch
(TransportConnection.java:218)
at
org.apache.activemq.broker.AbstractConnection.processDispatch
(AbstractConnection.java:722)
at
org.apache.activemq.broker.AbstractConnection.dispatchSync
(AbstractConnection.java:699)
at
org.apache.activemq.broker.region.PrefetchSubscription.dispatch
(PrefetchSubscription.java:311)
at
org.apache.activemq.broker.region.QueueSubscription.dispatch
(QueueSubscription.java:152)
at
org.apache.activemq.broker.region.PrefetchSubscription.add
(PrefetchSubscription.java:68)
at
org.apache.activemq.broker.region.policy.RoundRobinDispatchPolicy.di
sp
atch(RoundRobinDispatchPolicy.java:54)
at
org.apache.activemq.broker.region.Queue.removeSubscription
(Queue.java:233)
at
org.apache.activemq.broker.region.AbstractRegion.removeConsumer
(AbstractRegion.java:208)
seems that serviceTransportException(AbstractConnection.java:183)
is where
the self call is triggered.
could someone look into this and let me know if there is simple fix
for this
deadlock condition? thanks in advance.
--
View this message in context: http://www.nabble.com/couldn%27t-add-
consumers-after-existing-consumers-are-killed-%28deadlock-in-broker%
29-tf3118423.html#a8639042
Sent from the ActiveMQ - User mailing list archive at Nabble.com.