Stirling Chow created AMQ-4159:
----------------------------------
Summary: Lack of thread-safety in SimpleDiscoveryAgent can cause
multiple concurrent attempts to establish bridge, which can result in permanent
bridge failure
Key: AMQ-4159
URL: https://issues.apache.org/jira/browse/AMQ-4159
Project: ActiveMQ
Issue Type: Bug
Affects Versions: 5.8.0
Reporter: Stirling Chow
Symptom
=======
I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed
that during one of the tests, concurrent calls were being made to
{{DiscoveryNetworkConnector.onServiceAdd}} for the same {{DiscoveryEvent}}.
This was unexpected because only a single service (URL) had been given to
{{SimpleDiscoveryAgent}}. In fact, during one of the tests I observed dozens
of concurrent calls.
Concurrent attempts to establish a bridge to the *same* remote broker are
problematic because they expose a number of race conditions in
{{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent
bridge failure, as well as unnecessary thread pool execution/resource usage and
logging.
The race conditions will be filed as separate issues. This issue specifically
addresses the bug that causes {{SimpleDiscoveryAgent}} to uncontrollably
multiply bridge connection attempts.
Cause
=====
When {{DemandForwardingBridgeSupport}} handles exceptions from either the local
or remote sides of the the bridge, it fires a "bridge failed" event:
{code:title=DemandForwardingBridgeSupport.java}
public void serviceLocalException(Throwable error) {
if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof
GeneralSecurityException) {
LOG.error("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
} else {
LOG.warn("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
}
LOG.debug("The remote Exception was: " + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
private void fireBridgeFailed() {
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.bridgeFailed();
}
}
{code}
{{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and it's
{{bridgeFailed}} method calls back to
{{SimpleDiscoveryAgent.serviceFailed(...)}}:
{code:title=DiscoveryNetworkConnectol.java}
protected NetworkBridge createBridge(Transport localTransport, Transport
remoteTransport, final DiscoveryEvent event) {
class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
public DiscoverNetworkBridgeListener(BrokerService brokerService,
ObjectName connectorName) {
super(brokerService, connectorName);
}
public void bridgeFailed() {
if (!serviceSupport.isStopped()) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
}
...
{code}
In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the
{{reconnectDelay}} before attempting to re-establish the bridge via
{{DiscoveryNetworkConnector.onServiceAdd(...)}}:
{code:title=SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {
final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
if (sevent.failed.compareAndSet(false, true)) {
listener.onServiceRemove(sevent);
taskRunner.execute(new Runnable() {
public void run() {
// We detect a failed connection attempt because the service
// fails right
// away.
if (event.connectTime + minConnectTime >
System.currentTimeMillis()) {
LOG.debug("Failure occurred soon after the discovery event
was generated. It will be classified as a connection failure: "+event);
...
synchronized (sleepMutex) {
try {
if (!running.get()) {
LOG.debug("Reconnecting disabled: stopped");
return;
}
LOG.debug("Waiting "+event.reconnectDelay+" ms
before attempting to reconnect.");
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
LOG.debug("Reconnecting disabled: " + ie);
Thread.currentThread().interrupt();
return;
}
}
...
event.connectTime = System.currentTimeMillis();
event.failed.set(false);
listener.onServiceAdd(event);
}
}, "Simple Discovery Agent");
}
}
{code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira