[
https://issues.apache.org/jira/browse/AMQ-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Stirling Chow updated AMQ-4159:
-------------------------------
Description:
Symptom
=======
I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed
that during one of the tests, concurrent calls were being made to
{{DiscoveryNetworkConnector.onServiceAdd(...)}} for the same
{{DiscoveryEvent}}. This was unexpected because only a single service (URL)
had been given to {{SimpleDiscoveryAgent}}. In fact, during one of the tests I
observed dozens of concurrent calls.
Concurrent attempts to establish a bridge to the *same* remote broker are
problematic because they expose a number of race conditions in
{{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent
bridge failure (see AMQ-4160), as well as unnecessary thread pool
execution/resource usage and logging.
The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be
filed as separate issues. This issue specifically addresses the bug that
causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection
attempts.
Cause
=====
When {{DemandForwardingBridgeSupport}} handles exceptions from either the local
or remote sides of the the bridge, it fires a "bridge failed" event:
{code:title=DemandForwardingBridgeSupport.java}
public void serviceLocalException(Throwable error) {
if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof
GeneralSecurityException) {
LOG.error("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
} else {
LOG.warn("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
}
LOG.debug("The remote Exception was: " + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
private void fireBridgeFailed() {
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.bridgeFailed();
}
}
{code}
{{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its
{{bridgeFailed()}} method calls back to
{{SimpleDiscoveryAgent.serviceFailed(...)}}:
{code:title=DiscoveryNetworkConnectol.java}
protected NetworkBridge createBridge(Transport localTransport, Transport
remoteTransport, final DiscoveryEvent event) {
class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
public DiscoverNetworkBridgeListener(BrokerService brokerService,
ObjectName connectorName) {
super(brokerService, connectorName);
}
public void bridgeFailed() {
if (!serviceSupport.isStopped()) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
}
...
{code}
In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the
{{reconnectDelay}} before attempting to re-establish the bridge via
{{DiscoveryNetworkConnector.onServiceAdd(...)}}:
{code:title=SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {
final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
if (sevent.failed.compareAndSet(false, true)) {
listener.onServiceRemove(sevent);
taskRunner.execute(new Runnable() {
public void run() {
// We detect a failed connection attempt because the service
// fails right
// away.
if (event.connectTime + minConnectTime >
System.currentTimeMillis()) {
LOG.debug("Failure occurred soon after the discovery event
was generated. It will be classified as a connection failure: "+event);
...
synchronized (sleepMutex) {
try {
if (!running.get()) {
LOG.debug("Reconnecting disabled: stopped");
return;
}
LOG.debug("Waiting "+event.reconnectDelay+" ms
before attempting to reconnect.");
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
LOG.debug("Reconnecting disabled: " + ie);
Thread.currentThread().interrupt();
return;
}
}
...
event.connectTime = System.currentTimeMillis();
event.failed.set(false);
listener.onServiceAdd(event);
}
}, "Simple Discovery Agent");
}
}
{code}
*NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!
There are two race conditions that allow
{{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread,
each attempting to re-restablish the same bridge.
First, note that
{{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches a
separate thread that stops the bridge:
{code:title=DemandForwardingBridgeSupport.java}
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof
GeneralSecurityException) {
LOG.error("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
} else {
LOG.warn("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
}
LOG.debug("The remote Exception was: " + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
if (disposed.compareAndSet(false, true)) {
LOG.debug(" stopping " + configuration.getBrokerName() + " bridge
to " + remoteBrokerName);
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStop(this);
}
{code}
When the bridge stops, the {{disposed}} flag is set, which prevents subsequent
calls to {{serviceLocal/RemoteException(...)}} from calling
{{fireBridgeFailed()}}. However, since the call to
{{DemandForwardingBridgeSupport.stop()}} is made by a separate thread, multiple
{{serviceLocal/RemoteException(...)}} calls that are made in quick succession
can result in multiple calls to {{fireBridgeFailed()}}.
This is the first race condition: multiple calls can be made to
{{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge. By
transitivity, this results in multiple calls to
{{SimpleDiscoveryAgent.serviceFailed(...)}}.
{{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class,
{{event.failed.compareAndSet(false, true)}}, which should only allow the first
call to launch a bridge reconnect thread. However, once the {{reconnectDelay}}
expires, {{event.failed}} is reset to {{false}}, which allows re-entry to the
failure handling logic, and the possibile launching of additional bridge
reconnect threads if the {{reconnectDelay}} is short or the threads calling
{{serviceFailed(...)}} are delayed.
This is the second race condition: the guard clause in
{{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the subsequent
redundant calls have been filtered out.
These two race conditions allow a single call to
{{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple
subsequent concurrent (re)calls, and these concurrent calls can spawn their own
multiple concurrent calls. The result can be an unlimited number of concurrent
calls to {{onServiceAdd(...)}}.
Unit Test
=========
The attached unit test demonstrates this bug by simulating a bridge failure
that has yet to be detected by the remote broker (i.e., before the
{{InactivityMonitor}} closes the connection). The local broker attempts to
re-establish the bridge, but its call to
{{DemandForwardingBridge.startRemoteBroker()}} fails because the remote broker
rejects the new connection since the old one still exists. Since
{{startRemoteBroker}} sends multiple messages to the remote broker, multiple
exceptions are generated:
{code:title=DemandForwardingBridgeSupport.java}
protected void startRemoteBridge() throws Exception {
...
remoteBroker.oneway(brokerInfo);
...
remoteBroker.oneway(remoteConnectionInfo);
...
remoteBroker.oneway(producerInfo);
...
remoteBroker.oneway(demandConsumerInfo);
}
{code}
The multiple exceptions result in multiple calls to
{{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows the
first race condition to be exhibited.
The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make
the second race condition improbable; therefore, this test generally passes.
The second unit test has a 0s {{reconnectDelay}}; on my system, this makes the
timing of multiple calls to
{{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the
second race condition is reliably exhibited, resulting in the unit test failing
because it detects concurrent calls to
{{DiscoveryNetworkConnector.onServiceAdd(...)}}.
Solution
========
While it would be possible to add a {{failed.compareAndSet(false,true)}} guard
clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the
first race condition from allowing multiple calls to
{{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race
condition in {{serviceFailed}}. This can be trivially addressed by making a
copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}}
guard clause from being reset:
{code:title=Patched SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {
final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
if (sevent.failed.compareAndSet(false, true)) {
listener.onServiceRemove(sevent);
taskRunner.execute(new Runnable() {
public void run() {
SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
...
event.connectTime = System.currentTimeMillis();
event.failed.set(false);
listener.onServiceAdd(event);
}
}, "Simple Discovery Agent");
}
}
{code}
was:
Symptom
=======
I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed
that during one of the tests, concurrent calls were being made to
{{DiscoveryNetworkConnector.onServiceAdd(...)}} for the same
{{DiscoveryEvent}}. This was unexpected because only a single service (URL)
had been given to {{SimpleDiscoveryAgent}}. In fact, during one of the tests I
observed dozens of concurrent calls.
Concurrent attempts to establish a bridge to the *same* remote broker are
problematic because they expose a number of race conditions in
{{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent
bridge failure (see AMQ-4160), as well as unnecessary thread pool
execution/resource usage and logging.
The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be
filed as separate issues. This issue specifically addresses the bug that
causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection
attempts.
Cause
=====
When {{DemandForwardingBridgeSupport}} handles exceptions from either the local
or remote sides of the the bridge, it fires a "bridge failed" event:
{code:title=DemandForwardingBridgeSupport.java}
public void serviceLocalException(Throwable error) {
if (!disposed.get()) {
LOG.info("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a local error: " + error);
LOG.debug("The local Exception was:" + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof
GeneralSecurityException) {
LOG.error("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
} else {
LOG.warn("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
}
LOG.debug("The remote Exception was: " + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
private void fireBridgeFailed() {
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.bridgeFailed();
}
}
{code}
{{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its
{{bridgeFailed()}} method calls back to
{{SimpleDiscoveryAgent.serviceFailed(...)}}:
{code:title=DiscoveryNetworkConnectol.java}
protected NetworkBridge createBridge(Transport localTransport, Transport
remoteTransport, final DiscoveryEvent event) {
class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
public DiscoverNetworkBridgeListener(BrokerService brokerService,
ObjectName connectorName) {
super(brokerService, connectorName);
}
public void bridgeFailed() {
if (!serviceSupport.isStopped()) {
try {
discoveryAgent.serviceFailed(event);
} catch (IOException e) {
}
}
}
}
...
{code}
In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the
{{reconnectDelay}} before attempting to re-establish the bridge via
{{DiscoveryNetworkConnector.onServiceAdd(...)}}:
{code:title=SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {
final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
if (sevent.failed.compareAndSet(false, true)) {
listener.onServiceRemove(sevent);
taskRunner.execute(new Runnable() {
public void run() {
// We detect a failed connection attempt because the service
// fails right
// away.
if (event.connectTime + minConnectTime >
System.currentTimeMillis()) {
LOG.debug("Failure occurred soon after the discovery event
was generated. It will be classified as a connection failure: "+event);
...
synchronized (sleepMutex) {
try {
if (!running.get()) {
LOG.debug("Reconnecting disabled: stopped");
return;
}
LOG.debug("Waiting "+event.reconnectDelay+" ms
before attempting to reconnect.");
sleepMutex.wait(event.reconnectDelay);
} catch (InterruptedException ie) {
LOG.debug("Reconnecting disabled: " + ie);
Thread.currentThread().interrupt();
return;
}
}
...
event.connectTime = System.currentTimeMillis();
event.failed.set(false);
listener.onServiceAdd(event);
}
}, "Simple Discovery Agent");
}
}
{code}
*NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!
There are two race conditions that allow
{{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread,
each attempting to re-restablish the same bridge.
First, note that
{{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches a
separate thread that stops the bridge:
{code:title=DemandForwardingBridgeSupport.java}
public void serviceRemoteException(Throwable error) {
if (!disposed.get()) {
if (error instanceof SecurityException || error instanceof
GeneralSecurityException) {
LOG.error("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
} else {
LOG.warn("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown due to a remote error: " + error);
}
LOG.debug("The remote Exception was: " + error, error);
brokerService.getTaskRunnerFactory().execute(new Runnable() {
public void run() {
ServiceSupport.dispose(getControllingService());
}
});
fireBridgeFailed();
}
}
public void stop() throws Exception {
if (started.compareAndSet(true, false)) {
if (disposed.compareAndSet(false, true)) {
LOG.debug(" stopping " + configuration.getBrokerName() + " bridge
to " + remoteBrokerName);
NetworkBridgeListener l = this.networkBridgeListener;
if (l != null) {
l.onStop(this);
}
{code}
When the bridge stops, the {{disposed}} flag is set, which prevents subsequent
calls to {{serviceLocal/RemoteException(...)}} from calling
{{fireBridgeFailed()}}. However, since the call to
{{DemandForwardingBridgeSupport.stop()}} is made by a separate thread, multiple
{{serviceLocal/RemoteException(...)}} calls that are made in quick succession
can result in multiple calls to {{fireBridgeFailed()}}.
This is the first race condition: multiple calls can be made to
{{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge. By
transitivity, this results in multiple calls to
{{SimpleDiscoveryAgent.serviceFailed(...)}}.
{{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class,
{{event.failed.compareAndSet(false, true)}}, which should only allow the first
call to launch a bridge reconnect thread. However, once the {{reconnectDelay}}
expires, {{event.failed}} is reset to {{false}}, which allows re-entry to the
failure handling logic, and the possibile launching of additional bridge
reconnect threads if the {{reconnectDelay}} is short or the threads calling
{{serviceFailed(...)}} are delayed.
This is the second race condition: the guard clause in
{{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the subsequent
redundant calls have been filtered out.
These two race conditions allow a single call to
{{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple
subsequent concurrent (re)calls, and these concurrent calls can spawn their own
multiple concurrent calls. The result can be an unlimited number of concurrent
calls to {{onServiceAdd(...)}}.
Unit Test
=========
The attached unit test demonstrates this bug by simulating a bridge failure
that has yet to be detected by the remote broker (i.e., before the
{{InactivityMonitor}} closes the connection). The local broker attempts to
re-establish the bridge, but its call to
{{DemandForwardingBridge.startRemoteBroker()}} fails because the remote broker
rejects the new connection since the old one still exists. Since
{{startRemoteBroker}} sends multiple messages to the remote broker, multiple
exceptions are generated:
{code:title=DemandForwardingBridgeSupport.java}
protected void startRemoteBridge() throws Exception {
...
remoteBroker.oneway(brokerInfo);
...
remoteBroker.oneway(remoteConnectionInfo);
...
remoteBroker.oneway(producerInfo);
...
remoteBroker.oneway(demandConsumerInfo);
}
{code}
The multiple exceptions result in multiple calls to
{{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows the
first race condition to be exhibited.
The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make
the second race condition improbable; therefore, this test generally passes.
The second unit test has a 0s {[reconnectDelay}}; on my system, this makes the
timing of multiple calls to
{{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the
second race condition is reliably exhibited, resulting in the unit test failing
because it detects concurrent calls to
{{DiscoveryNetworkConnector.onServiceAdd(...)}}.
Solution
========
While it would be possible to add a {{failed.compareAndSet(false,true)}} guard
clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the
first race condition from allowing multiple calls to
{{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race
condition in {{serviceFailed}}. This can be trivially addressed by making a
copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}}
guard clause from being reset:
{code:title=Patched SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {
final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
if (sevent.failed.compareAndSet(false, true)) {
listener.onServiceRemove(sevent);
taskRunner.execute(new Runnable() {
public void run() {
SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
...
event.connectTime = System.currentTimeMillis();
event.failed.set(false);
listener.onServiceAdd(event);
}
}, "Simple Discovery Agent");
}
}
{code}
> Race condition in SimpleDiscoveryAgent creates multiple concurrent threads
> attempting to connect to the same bridge --- can result in deadlock
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-4159
> URL: https://issues.apache.org/jira/browse/AMQ-4159
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.8.0
> Reporter: Stirling Chow
> Assignee: Timothy Bish
> Priority: Critical
> Fix For: 5.8.0
>
> Attachments: AMQ4159.patch, AMQ4159Test.java
>
>
> Symptom
> =======
> I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and
> noticed that during one of the tests, concurrent calls were being made to
> {{DiscoveryNetworkConnector.onServiceAdd(...)}} for the same
> {{DiscoveryEvent}}. This was unexpected because only a single service (URL)
> had been given to {{SimpleDiscoveryAgent}}. In fact, during one of the tests
> I observed dozens of concurrent calls.
> Concurrent attempts to establish a bridge to the *same* remote broker are
> problematic because they expose a number of race conditions in
> {{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent
> bridge failure (see AMQ-4160), as well as unnecessary thread pool
> execution/resource usage and logging.
> The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be
> filed as separate issues. This issue specifically addresses the bug that
> causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection
> attempts.
> Cause
> =====
> When {{DemandForwardingBridgeSupport}} handles exceptions from either the
> local or remote sides of the the bridge, it fires a "bridge failed" event:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceLocalException(Throwable error) {
> if (!disposed.get()) {
> LOG.info("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a local error: " + error);
> LOG.debug("The local Exception was:" + error, error);
> brokerService.getTaskRunnerFactory().execute(new Runnable() {
> public void run() {
> ServiceSupport.dispose(getControllingService());
> }
> });
> fireBridgeFailed();
> }
> }
> public void serviceRemoteException(Throwable error) {
> if (!disposed.get()) {
> if (error instanceof SecurityException || error instanceof
> GeneralSecurityException) {
> LOG.error("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a remote error: " + error);
> } else {
> LOG.warn("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a remote error: " + error);
> }
> LOG.debug("The remote Exception was: " + error, error);
> brokerService.getTaskRunnerFactory().execute(new Runnable() {
> public void run() {
> ServiceSupport.dispose(getControllingService());
> }
> });
> fireBridgeFailed();
> }
> }
> private void fireBridgeFailed() {
> NetworkBridgeListener l = this.networkBridgeListener;
> if (l != null) {
> l.bridgeFailed();
> }
> }
> {code}
> {{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its
> {{bridgeFailed()}} method calls back to
> {{SimpleDiscoveryAgent.serviceFailed(...)}}:
> {code:title=DiscoveryNetworkConnectol.java}
> protected NetworkBridge createBridge(Transport localTransport, Transport
> remoteTransport, final DiscoveryEvent event) {
> class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
> public DiscoverNetworkBridgeListener(BrokerService brokerService,
> ObjectName connectorName) {
> super(brokerService, connectorName);
> }
> public void bridgeFailed() {
> if (!serviceSupport.isStopped()) {
> try {
> discoveryAgent.serviceFailed(event);
> } catch (IOException e) {
> }
> }
> }
> }
> ...
> {code}
> In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the
> {{reconnectDelay}} before attempting to re-establish the bridge via
> {{DiscoveryNetworkConnector.onServiceAdd(...)}}:
> {code:title=SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
> final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
> if (sevent.failed.compareAndSet(false, true)) {
> listener.onServiceRemove(sevent);
> taskRunner.execute(new Runnable() {
> public void run() {
> // We detect a failed connection attempt because the service
> // fails right
> // away.
> if (event.connectTime + minConnectTime >
> System.currentTimeMillis()) {
> LOG.debug("Failure occurred soon after the discovery
> event was generated. It will be classified as a connection failure: "+event);
> ...
> synchronized (sleepMutex) {
> try {
> if (!running.get()) {
> LOG.debug("Reconnecting disabled: stopped");
> return;
> }
> LOG.debug("Waiting "+event.reconnectDelay+" ms
> before attempting to reconnect.");
> sleepMutex.wait(event.reconnectDelay);
> } catch (InterruptedException ie) {
> LOG.debug("Reconnecting disabled: " + ie);
> Thread.currentThread().interrupt();
> return;
> }
> }
> ...
> event.connectTime = System.currentTimeMillis();
> event.failed.set(false);
> listener.onServiceAdd(event);
> }
> }, "Simple Discovery Agent");
> }
> }
> {code}
> *NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!
> There are two race conditions that allow
> {{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread,
> each attempting to re-restablish the same bridge.
> First, note that
> {{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches
> a separate thread that stops the bridge:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceRemoteException(Throwable error) {
> if (!disposed.get()) {
> if (error instanceof SecurityException || error instanceof
> GeneralSecurityException) {
> LOG.error("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a remote error: " + error);
> } else {
> LOG.warn("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a remote error: " + error);
> }
> LOG.debug("The remote Exception was: " + error, error);
> brokerService.getTaskRunnerFactory().execute(new Runnable() {
> public void run() {
> ServiceSupport.dispose(getControllingService());
> }
> });
> fireBridgeFailed();
> }
> }
> public void stop() throws Exception {
> if (started.compareAndSet(true, false)) {
> if (disposed.compareAndSet(false, true)) {
> LOG.debug(" stopping " + configuration.getBrokerName() + " bridge
> to " + remoteBrokerName);
> NetworkBridgeListener l = this.networkBridgeListener;
> if (l != null) {
> l.onStop(this);
> }
> {code}
> When the bridge stops, the {{disposed}} flag is set, which prevents
> subsequent calls to {{serviceLocal/RemoteException(...)}} from calling
> {{fireBridgeFailed()}}. However, since the call to
> {{DemandForwardingBridgeSupport.stop()}} is made by a separate thread,
> multiple {{serviceLocal/RemoteException(...)}} calls that are made in quick
> succession can result in multiple calls to {{fireBridgeFailed()}}.
> This is the first race condition: multiple calls can be made to
> {{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge. By
> transitivity, this results in multiple calls to
> {{SimpleDiscoveryAgent.serviceFailed(...)}}.
> {{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class,
> {{event.failed.compareAndSet(false, true)}}, which should only allow the
> first call to launch a bridge reconnect thread. However, once the
> {{reconnectDelay}} expires, {{event.failed}} is reset to {{false}}, which
> allows re-entry to the failure handling logic, and the possibile launching of
> additional bridge reconnect threads if the {{reconnectDelay}} is short or the
> threads calling {{serviceFailed(...)}} are delayed.
> This is the second race condition: the guard clause in
> {{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the
> subsequent redundant calls have been filtered out.
> These two race conditions allow a single call to
> {{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple
> subsequent concurrent (re)calls, and these concurrent calls can spawn their
> own multiple concurrent calls. The result can be an unlimited number of
> concurrent calls to {{onServiceAdd(...)}}.
> Unit Test
> =========
> The attached unit test demonstrates this bug by simulating a bridge failure
> that has yet to be detected by the remote broker (i.e., before the
> {{InactivityMonitor}} closes the connection). The local broker attempts to
> re-establish the bridge, but its call to
> {{DemandForwardingBridge.startRemoteBroker()}} fails because the remote
> broker rejects the new connection since the old one still exists. Since
> {{startRemoteBroker}} sends multiple messages to the remote broker, multiple
> exceptions are generated:
> {code:title=DemandForwardingBridgeSupport.java}
> protected void startRemoteBridge() throws Exception {
> ...
> remoteBroker.oneway(brokerInfo);
> ...
> remoteBroker.oneway(remoteConnectionInfo);
> ...
> remoteBroker.oneway(producerInfo);
> ...
> remoteBroker.oneway(demandConsumerInfo);
> }
> {code}
> The multiple exceptions result in multiple calls to
> {{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows
> the first race condition to be exhibited.
> The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make
> the second race condition improbable; therefore, this test generally passes.
> The second unit test has a 0s {{reconnectDelay}}; on my system, this makes
> the timing of multiple calls to
> {{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the
> second race condition is reliably exhibited, resulting in the unit test
> failing because it detects concurrent calls to
> {{DiscoveryNetworkConnector.onServiceAdd(...)}}.
> Solution
> ========
> While it would be possible to add a {{failed.compareAndSet(false,true)}}
> guard clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and
> prevent the first race condition from allowing multiple calls to
> {{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race
> condition in {{serviceFailed}}. This can be trivially addressed by making a
> copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}}
> guard clause from being reset:
> {code:title=Patched SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
> final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
> if (sevent.failed.compareAndSet(false, true)) {
> listener.onServiceRemove(sevent);
> taskRunner.execute(new Runnable() {
> public void run() {
> SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
> ...
> event.connectTime = System.currentTimeMillis();
> event.failed.set(false);
> listener.onServiceAdd(event);
> }
> }, "Simple Discovery Agent");
> }
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira