[
https://issues.apache.org/jira/browse/AMQ-5260?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14149239#comment-14149239
]
matteo rulli commented on AMQ-5260:
-----------------------------------
Thank you for your prompt reply! Now I understand the role of MutexTransport.
By the way, I noticed that the deadlock involves two MutexTransport instances
that wrap *VMTransport(s)*. Apparently if I apply the following hack
{code:java}
--- MutexTransport.java Thu Jun 05 14:48:36 2014
+++ MutexTransport.edited.java Fri Sep 26 09:11:41 2014
@@ -19,17 +19,31 @@
import java.io.IOException;
import java.util.concurrent.locks.ReentrantLock;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Thread safe Transport Filter that serializes calls to and from the
Transport Stack.
*/
public class MutexTransport extends TransportFilter {
- private final ReentrantLock writeLock = new ReentrantLock();
+ private static final Logger LOG =
LoggerFactory.getLogger(MutexTransport.class);
+ private final static ReentrantLock vmWriteLock = new ReentrantLock();
+ private ReentrantLock writeLock;
private boolean syncOnCommand;
public MutexTransport(Transport next) {
super(next);
this.syncOnCommand = false;
+
+ writeLock = null;
+ if(next != null && next.toString().startsWith("vm://")){
+ writeLock = vmWriteLock;
+ LOG.error("#@mrul# vm transport with mutex: " + next);
+ }else{
+ writeLock = new ReentrantLock();
+ LOG.error("#@mrul# non-vm transport with mutex: " + next);
+ }
}
public MutexTransport(Transport next, boolean syncOnCommand) {
{code}
the deadlock disappears. I noticed that the network bridge create many
VMTransport of the form vm://brokerName#<number> using basically three
different thread paths and all these local VMTransports trigger many
intertwined command exchanges that somehow determine the deadlock:
h1. VMTransports creation PATH 1
{noformat}
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkConnector.createLocalTransport(NetworkConnector.java:154)
org.apache.activemq.network.DiscoveryNetworkConnector.onServiceAdd(DiscoveryNetworkConnector.java:136)
org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent.start(SimpleDiscoveryAgent.java:89)
org.apache.activemq.network.DiscoveryNetworkConnector.handleStart(DiscoveryNetworkConnector.java:205)
org.apache.activemq.network.NetworkConnector$1.doStart(NetworkConnector.java:59)
org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
org.apache.activemq.network.NetworkConnector.start(NetworkConnector.java:159)
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2501)
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:693)
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:659)
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595)
org.apache.activemq.JmsMultipleBrokersTestSupport.startAllBrokers(JmsMultipleBrokersTestSupport.java:277)
{noformat}
h1. VMTransports creation PATH 2
{noformat}
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.network.DemandForwardingBridgeSupport.start(DemandForwardingBridgeSupport.java:184)
org.apache.activemq.network.DiscoveryNetworkConnector.onServiceAdd(DiscoveryNetworkConnector.java:152)
org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent.start(SimpleDiscoveryAgent.java:89)
org.apache.activemq.network.DiscoveryNetworkConnector.handleStart(DiscoveryNetworkConnector.java:205)
org.apache.activemq.network.NetworkConnector$1.doStart(NetworkConnector.java:59)
org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
org.apache.activemq.network.NetworkConnector.start(NetworkConnector.java:159)
org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2501)
org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:693)
org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:659)
org.apache.activemq.broker.BrokerService.start(BrokerService.java:595)
org.apache.activemq.JmsMultipleBrokersTestSupport.startAllBrokers(JmsMultipleBrokersTestSupport.java:277)
{noformat}
h1. VMTransports creation PATH 3
{noformat}
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.transport.vm.VMTransportServer$1.<init>(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:77)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1321)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
{noformat}
h1. VMTransports creation PATH 3, take 2 (cyclic???)
{noformat}
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1338)
java.lang.Thread.getStackTrace(Unknown Source)
org.apache.activemq.transport.vm.VMTransport.<init>(VMTransport.java:73)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)
<------------------------------ !!!
org.apache.activemq.transport.vm.VMTransportServer.connect(VMTransportServer.java:88)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.transport.vm.VMTransportFactory.doCompositeConnect(VMTransportFactory.java:147)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.vm.VMTransportFactory.doConnect(VMTransportFactory.java:54)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.TransportFactory.connect(TransportFactory.java:64)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
org.apache.activemq.network.NetworkBridgeFactory.createLocalTransport(NetworkBridgeFactory.java:80)
org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
org.apache.activemq.broker.TransportConnection.processBrokerInfo(TransportConnection.java:1321)
org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
org.apache.activemq.command.BrokerInfo.visit(BrokerInfo.java:126)
<------------------------------ !!!
org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:294)
org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:148)
org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:61)
org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
{noformat}
So, as a tentative fix I tried to globally lock VMTransport usage under the
assumption that VM transport should be negligible in number with respect to
other transports (tcp, nio, etc.) and thus the static lock in VM trans. should
not constitute a big performace bottleneck; I added a new constructor in
_*MutexTransport*_ in this way:
{code}
public class MutexTransport extends TransportFilter {
// ...
private final static ReentrantLock staticWriteLock = new
ReentrantLock();
private ReentrantLock writeLock = new ReentrantLock();
// ...
public MutexTransport(Transport next, boolean syncOnCommand, boolean
useStaticLock) {
this(next, syncOnCommand);
if(useStaticLock)
writeLock = staticWriteLock;
}
{code}
and I modified the
_*org.apache.activemq.transport.vm.VMTransportServer.configure(Transport)*_
implementation in this way:
{code}
public static Transport configure(Transport transport) {
transport = new MutexTransport(transport, false, true); // use static locks
for VMTransport
transport = new ResponseCorrelator(transport);
return transport;
}
{code}
Apparently this solve the problem (anyway I'm going to perform additional tests
to double check this). Is it in your opinion a reasonable approach or is it
just a _quick and dirty_ solution?
> Cross talk over duplex network connection can lead to blocking (alternative
> take)
> ---------------------------------------------------------------------------------
>
> Key: AMQ-5260
> URL: https://issues.apache.org/jira/browse/AMQ-5260
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.9.0, 5.10.0
> Reporter: matteo rulli
> Attachments: AMQ5260AdvancedTest.java, AMQ_5260.patch,
> AMQ_5260_2.patch, AMQ_5260_3.patch, deadlock.jpg, debug.jpg
>
>
> Pretty the same description with respect to AMQ-4328.
> ----
> !deadlock.jpg!
> h2. Stacktraces:
> Stacktrace no.1:
> {noformat}
> Name: ActiveMQ NIO Worker 12
> State: BLOCKED on java.net.URI@1bae2b28 owned by: ActiveMQ Transport:
> tcp:///10.0.1.219:61616@57789
> Total blocked: 2 Total waited: 67
> Stack trace:
>
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:714)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:581)
> org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:191)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> org.apache.activemq.transport.nio.NIOTransport.serviceRead(NIOTransport.java:138)
> org.apache.activemq.transport.nio.NIOTransport$1.onSelect(NIOTransport.java:69)
> org.apache.activemq.transport.nio.SelectorSelection.onSelect(SelectorSelection.java:94)
> org.apache.activemq.transport.nio.SelectorWorker$1.run(SelectorWorker.java:119)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> java.lang.Thread.run(Unknown Source)
> {noformat}
> ----
> stack trace no.2
> {noformat}
> Name: ActiveMQ Transport: tcp:///10.0.1.219:61616@57789
> State: WAITING on
> java.util.concurrent.locks.ReentrantLock$NonfairSync@3cdbfa3e owned by:
> ActiveMQ BrokerService[master2] Task-4
> Total blocked: 19 Total waited: 3
> Stack trace:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown
> Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown
> Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Unknown Source)
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(Unknown Source)
> java.util.concurrent.locks.ReentrantLock.lock(Unknown Source)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1339)
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:858)
> org.apache.activemq.broker.TransportConnection.dispatchSync(TransportConnection.java:818)
> org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:151)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:138)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:127)
> - locked java.util.concurrent.atomic.AtomicBoolean@689389da
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:104)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
> org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
> org.apache.activemq.network.DemandForwardingBridgeSupport.addSubscription(DemandForwardingBridgeSupport.java:856)
> org.apache.activemq.network.DemandForwardingBridgeSupport.addConsumerInfo(DemandForwardingBridgeSupport.java:1128)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteConsumerAdvisory(DemandForwardingBridgeSupport.java:714)
> - locked java.net.URI@1bae2b28
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceRemoteCommand(DemandForwardingBridgeSupport.java:581)
> org.apache.activemq.network.DemandForwardingBridgeSupport$3.onCommand(DemandForwardingBridgeSupport.java:191)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.failover.FailoverTransport$3.onCommand(FailoverTransport.java:196)
> org.apache.activemq.transport.WireFormatNegotiator.onCommand(WireFormatNegotiator.java:113)
> org.apache.activemq.transport.AbstractInactivityMonitor.onCommand(AbstractInactivityMonitor.java:270)
> org.apache.activemq.transport.TransportSupport.doConsume(TransportSupport.java:83)
> org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:214)
> org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:196)
> java.lang.Thread.run(Unknown Source)
> {noformat}
> ----
> stack trace no.3
> {noformat}
> Name: ActiveMQ BrokerService[master2] Task-4
> State: WAITING on
> java.util.concurrent.locks.ReentrantLock$NonfairSync@717bb9c owned by:
> ActiveMQ Transport: tcp:///10.0.1.219:61616@57789
> Total blocked: 19 Total waited: 117
> Stack trace:
> sun.misc.Unsafe.park(Native Method)
> java.util.concurrent.locks.LockSupport.park(Unknown Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.parkAndCheckInterrupt(Unknown
> Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquireQueued(Unknown
> Source)
> java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(Unknown Source)
> java.util.concurrent.locks.ReentrantLock$NonfairSync.lock(Unknown Source)
> java.util.concurrent.locks.ReentrantLock.lock(Unknown Source)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:66)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.network.DemandForwardingBridgeSupport.serviceLocalCommand(DemandForwardingBridgeSupport.java:921)
> org.apache.activemq.network.DemandForwardingBridgeSupport$2.onCommand(DemandForwardingBridgeSupport.java:173)
> org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:116)
> org.apache.activemq.transport.MutexTransport.onCommand(MutexTransport.java:50)
> org.apache.activemq.transport.vm.VMTransport.doDispatch(VMTransport.java:138)
> org.apache.activemq.transport.vm.VMTransport.dispatch(VMTransport.java:127)
> - locked java.util.concurrent.atomic.AtomicBoolean@453bb109
> org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:104)
> org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
> org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
> org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:1339)
> org.apache.activemq.broker.TransportConnection.processDispatch(TransportConnection.java:858)
> org.apache.activemq.broker.TransportConnection.iterate(TransportConnection.java:904)
> org.apache.activemq.thread.PooledTaskRunner.runTask(PooledTaskRunner.java:129)
> org.apache.activemq.thread.PooledTaskRunner$1.run(PooledTaskRunner.java:47)
> java.util.concurrent.ThreadPoolExecutor$Worker.runTask(Unknown Source)
> java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
> java.lang.Thread.run(Unknown Source)
> {noformat}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)