[ 
https://issues.apache.org/jira/browse/AMQ-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Stirling Chow updated AMQ-4159:
-------------------------------

    Description: 
Symptom
=======
I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed 
that during one of the tests, concurrent calls were being made to 
{{DiscoveryNetworkConnector.onServiceAdd(...)}} for the same 
{{DiscoveryEvent}}.  This was unexpected because only a single service (URL) 
had been given to {{SimpleDiscoveryAgent}}.  In fact, during one of the tests I 
observed dozens of concurrent calls.

Concurrent attempts to establish a bridge to the *same* remote broker are 
problematic because they expose a number of race conditions in 
{{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent 
bridge failure (see AMQ-4160), as well as unnecessary thread pool 
execution/resource usage and logging.

The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be 
filed as separate issues.  This issue specifically addresses the bug that 
causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection 
attempts.

Cause
=====
When {{DemandForwardingBridgeSupport}} handles exceptions from either the local 
or remote sides of the the bridge, it fires a "bridge failed" event:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceLocalException(Throwable error) {
    if (!disposed.get()) {
        LOG.info("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a local error: " + error);
        LOG.debug("The local Exception was:" + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}


public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof 
GeneralSecurityException) {
            LOG.error("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        }
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

private void fireBridgeFailed() {
    NetworkBridgeListener l = this.networkBridgeListener;
    if (l != null) {
        l.bridgeFailed();
    }
}
{code}

{{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its 
{{bridgeFailed()}} method calls back to 
{{SimpleDiscoveryAgent.serviceFailed(...)}}:

{code:title=DiscoveryNetworkConnectol.java}
protected NetworkBridge createBridge(Transport localTransport, Transport 
remoteTransport, final DiscoveryEvent event) {
    class DiscoverNetworkBridgeListener extends MBeanNetworkListener {

        public DiscoverNetworkBridgeListener(BrokerService brokerService, 
ObjectName connectorName) {
            super(brokerService, connectorName);
        }

        public void bridgeFailed() {
            if (!serviceSupport.isStopped()) {
                try {
                    discoveryAgent.serviceFailed(event);
                } catch (IOException e) {
                }
            }

        }
    }
...
{code}

In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the 
{{reconnectDelay}} before attempting to re-establish the bridge via 
{{DiscoveryNetworkConnector.onServiceAdd(...)}}:

{code:title=SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                // We detect a failed connection attempt because the service
                // fails right
                // away.
                if (event.connectTime + minConnectTime > 
System.currentTimeMillis()) {
                    LOG.debug("Failure occurred soon after the discovery event 
was generated.  It will be classified as a connection failure: "+event);
...
                    synchronized (sleepMutex) {
                        try {
                            if (!running.get()) {
                                LOG.debug("Reconnecting disabled: stopped");
                                return;
                            }

                            LOG.debug("Waiting "+event.reconnectDelay+" ms 
before attempting to reconnect.");
                            sleepMutex.wait(event.reconnectDelay);
                        } catch (InterruptedException ie) {
                            LOG.debug("Reconnecting disabled: " + ie);
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code}

*NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!

There are two race conditions that allow 
{{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread, 
each attempting to re-restablish the same bridge.

First, note that 
{{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches a 
separate thread that stops the bridge:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof 
GeneralSecurityException) {
            LOG.error("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        } 
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

public void stop() throws Exception {
    if (started.compareAndSet(true, false)) {
        if (disposed.compareAndSet(false, true)) {
            LOG.debug(" stopping " + configuration.getBrokerName() + " bridge 
to " + remoteBrokerName);
            NetworkBridgeListener l = this.networkBridgeListener;
            if (l != null) {
                l.onStop(this);
            }
{code}

When the bridge stops, the {{disposed}} flag is set, which prevents subsequent 
calls to {{serviceLocal/RemoteException(...)}} from calling 
{{fireBridgeFailed()}}.  However, since the call to 
{{DemandForwardingBridgeSupport.stop()}} is made by a separate thread, multiple 
{{serviceLocal/RemoteException(...)}} calls that are made in quick succession 
can result in multiple calls to {{fireBridgeFailed()}}.

This is the first race condition: multiple calls can be made to 
{{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge.  By 
transitivity, this results in multiple calls to 
{{SimpleDiscoveryAgent.serviceFailed(...)}}.

{{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class, 
{{event.failed.compareAndSet(false, true)}}, which should only allow the first 
call to launch a bridge reconnect thread.  However, once the {{reconnectDelay}} 
expires, {{event.failed}} is reset to {{false}}, which allows re-entry to the 
failure handling logic, and the possibile launching of additional bridge 
reconnect threads if the {{reconnectDelay}} is short or the threads calling 
{{serviceFailed(...)}} are delayed.

This is the second race condition: the guard clause in 
{{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the subsequent 
redundant calls have been filtered out.

These two race conditions allow a single call to 
{{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple 
subsequent concurrent (re)calls, and these concurrent calls can spawn their own 
multiple concurrent calls.  The result can be an unlimited number of concurrent 
calls to {{onServiceAdd(...)}}.

Unit Test
=========
The attached unit test demonstrates this bug by simulating a bridge failure 
that has yet to be detected by the remote broker (i.e., before the 
{{InactivityMonitor}} closes the connection).  The local broker attempts to 
re-establish the bridge, but its call to 
{{DemandForwardingBridge.startRemoteBroker()}} fails because the remote broker 
rejects the new connection since the old one still exists.  Since 
{{startRemoteBroker}} sends multiple messages to the remote broker, multiple 
exceptions are generated:

{code:title=DemandForwardingBridgeSupport.java}
protected void startRemoteBridge() throws Exception {
...
                remoteBroker.oneway(brokerInfo);
...
            remoteBroker.oneway(remoteConnectionInfo);
...
            remoteBroker.oneway(producerInfo);
...
                remoteBroker.oneway(demandConsumerInfo);
}
{code}

The multiple exceptions result in multiple calls to 
{{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows the 
first race condition to be exhibited.  

The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make 
the second race condition improbable; therefore, this test generally passes. 

The second unit test has a 0s {{reconnectDelay}}; on my system, this makes the 
timing of multiple calls to 
{{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the 
second race condition is reliably exhibited, resulting in the unit test failing 
because it detects concurrent calls to 
{{DiscoveryNetworkConnector.onServiceAdd(...)}}.

Solution
========
While it would be possible to add a {{failed.compareAndSet(false,true)}} guard 
clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the 
first race condition from allowing multiple calls to 
{{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race 
condition in {{serviceFailed}}.  This can be trivially addressed by making a 
copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}} 
guard clause from being reset:

{code:title=Patched SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);

...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code} 

  was:
Symptom
=======
I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed 
that during one of the tests, concurrent calls were being made to 
{{DiscoveryNetworkConnector.onServiceAdd(...)}} for the same 
{{DiscoveryEvent}}.  This was unexpected because only a single service (URL) 
had been given to {{SimpleDiscoveryAgent}}.  In fact, during one of the tests I 
observed dozens of concurrent calls.

Concurrent attempts to establish a bridge to the *same* remote broker are 
problematic because they expose a number of race conditions in 
{{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent 
bridge failure (see AMQ-4160), as well as unnecessary thread pool 
execution/resource usage and logging.

The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be 
filed as separate issues.  This issue specifically addresses the bug that 
causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection 
attempts.

Cause
=====
When {{DemandForwardingBridgeSupport}} handles exceptions from either the local 
or remote sides of the the bridge, it fires a "bridge failed" event:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceLocalException(Throwable error) {
    if (!disposed.get()) {
        LOG.info("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a local error: " + error);
        LOG.debug("The local Exception was:" + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}


public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof 
GeneralSecurityException) {
            LOG.error("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        }
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

private void fireBridgeFailed() {
    NetworkBridgeListener l = this.networkBridgeListener;
    if (l != null) {
        l.bridgeFailed();
    }
}
{code}

{{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its 
{{bridgeFailed()}} method calls back to 
{{SimpleDiscoveryAgent.serviceFailed(...)}}:

{code:title=DiscoveryNetworkConnectol.java}
protected NetworkBridge createBridge(Transport localTransport, Transport 
remoteTransport, final DiscoveryEvent event) {
    class DiscoverNetworkBridgeListener extends MBeanNetworkListener {

        public DiscoverNetworkBridgeListener(BrokerService brokerService, 
ObjectName connectorName) {
            super(brokerService, connectorName);
        }

        public void bridgeFailed() {
            if (!serviceSupport.isStopped()) {
                try {
                    discoveryAgent.serviceFailed(event);
                } catch (IOException e) {
                }
            }

        }
    }
...
{code}

In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the 
{{reconnectDelay}} before attempting to re-establish the bridge via 
{{DiscoveryNetworkConnector.onServiceAdd(...)}}:

{code:title=SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                // We detect a failed connection attempt because the service
                // fails right
                // away.
                if (event.connectTime + minConnectTime > 
System.currentTimeMillis()) {
                    LOG.debug("Failure occurred soon after the discovery event 
was generated.  It will be classified as a connection failure: "+event);
...
                    synchronized (sleepMutex) {
                        try {
                            if (!running.get()) {
                                LOG.debug("Reconnecting disabled: stopped");
                                return;
                            }

                            LOG.debug("Waiting "+event.reconnectDelay+" ms 
before attempting to reconnect.");
                            sleepMutex.wait(event.reconnectDelay);
                        } catch (InterruptedException ie) {
                            LOG.debug("Reconnecting disabled: " + ie);
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code}

*NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!

There are two race conditions that allow 
{{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread, 
each attempting to re-restablish the same bridge.

First, note that 
{{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches a 
separate thread that stops the bridge:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof 
GeneralSecurityException) {
            LOG.error("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        } 
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

public void stop() throws Exception {
    if (started.compareAndSet(true, false)) {
        if (disposed.compareAndSet(false, true)) {
            LOG.debug(" stopping " + configuration.getBrokerName() + " bridge 
to " + remoteBrokerName);
            NetworkBridgeListener l = this.networkBridgeListener;
            if (l != null) {
                l.onStop(this);
            }
{code}

When the bridge stops, the {{disposed}} flag is set, which prevents subsequent 
calls to {{serviceLocal/RemoteException(...)}} from calling 
{{fireBridgeFailed()}}.  However, since the call to 
{{DemandForwardingBridgeSupport.stop()}} is made by a separate thread, multiple 
{{serviceLocal/RemoteException(...)}} calls that are made in quick succession 
can result in multiple calls to {{fireBridgeFailed()}}.

This is the first race condition: multiple calls can be made to 
{{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge.  By 
transitivity, this results in multiple calls to 
{{SimpleDiscoveryAgent.serviceFailed(...)}}.

{{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class, 
{{event.failed.compareAndSet(false, true)}}, which should only allow the first 
call to launch a bridge reconnect thread.  However, once the {{reconnectDelay}} 
expires, {{event.failed}} is reset to {{false}}, which allows re-entry to the 
failure handling logic, and the possibile launching of additional bridge 
reconnect threads if the {{reconnectDelay}} is short or the threads calling 
{{serviceFailed(...)}} are delayed.

This is the second race condition: the guard clause in 
{{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the subsequent 
redundant calls have been filtered out.

These two race conditions allow a single call to 
{{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple 
subsequent concurrent (re)calls, and these concurrent calls can spawn their own 
multiple concurrent calls.  The result can be an unlimited number of concurrent 
calls to {{onServiceAdd(...)}}.

Unit Test
=========
The attached unit test demonstrates this bug by simulating a bridge failure 
that has yet to be detected by the remote broker (i.e., before the 
{{InactivityMonitor}} closes the connection).  The local broker attempts to 
re-establish the bridge, but its call to 
{{DemandForwardingBridge.startRemoteBroker()}} fails because the remote broker 
rejects the new connection since the old one still exists.  Since 
{{startRemoteBroker}} sends multiple messages to the remote broker, multiple 
exceptions are generated:

{code:title=DemandForwardingBridgeSupport.java}
protected void startRemoteBridge() throws Exception {
...
                remoteBroker.oneway(brokerInfo);
...
            remoteBroker.oneway(remoteConnectionInfo);
...
            remoteBroker.oneway(producerInfo);
...
                remoteBroker.oneway(demandConsumerInfo);
}
{code}

The multiple exceptions result in multiple calls to 
{{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows the 
first race condition to be exhibited.  

The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make 
the second race condition improbable; therefore, this test generally passes. 

The second unit test has a 0s {[reconnectDelay}}; on my system, this makes the 
timing of multiple calls to 
{{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the 
second race condition is reliably exhibited, resulting in the unit test failing 
because it detects concurrent calls to 
{{DiscoveryNetworkConnector.onServiceAdd(...)}}.

Solution
========
While it would be possible to add a {{failed.compareAndSet(false,true)}} guard 
clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and prevent the 
first race condition from allowing multiple calls to 
{{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race 
condition in {{serviceFailed}}.  This can be trivially addressed by making a 
copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}} 
guard clause from being reset:

{code:title=Patched SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);

...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code} 

    
> Race condition in SimpleDiscoveryAgent creates multiple concurrent threads 
> attempting to connect to the same bridge --- can result in deadlock
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
>                 Key: AMQ-4159
>                 URL: https://issues.apache.org/jira/browse/AMQ-4159
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.8.0
>            Reporter: Stirling Chow
>            Assignee: Timothy Bish
>            Priority: Critical
>             Fix For: 5.8.0
>
>         Attachments: AMQ4159.patch, AMQ4159Test.java
>
>
> Symptom
> =======
> I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and 
> noticed that during one of the tests, concurrent calls were being made to 
> {{DiscoveryNetworkConnector.onServiceAdd(...)}} for the same 
> {{DiscoveryEvent}}.  This was unexpected because only a single service (URL) 
> had been given to {{SimpleDiscoveryAgent}}.  In fact, during one of the tests 
> I observed dozens of concurrent calls.
> Concurrent attempts to establish a bridge to the *same* remote broker are 
> problematic because they expose a number of race conditions in 
> {{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent 
> bridge failure (see AMQ-4160), as well as unnecessary thread pool 
> execution/resource usage and logging.
> The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be 
> filed as separate issues.  This issue specifically addresses the bug that 
> causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection 
> attempts.
> Cause
> =====
> When {{DemandForwardingBridgeSupport}} handles exceptions from either the 
> local or remote sides of the the bridge, it fires a "bridge failed" event:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceLocalException(Throwable error) {
>     if (!disposed.get()) {
>         LOG.info("Network connection between " + localBroker + " and " + 
> remoteBroker + " shutdown due to a local error: " + error);
>         LOG.debug("The local Exception was:" + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> public void serviceRemoteException(Throwable error) {
>     if (!disposed.get()) {
>         if (error instanceof SecurityException || error instanceof 
> GeneralSecurityException) {
>             LOG.error("Network connection between " + localBroker + " and " + 
> remoteBroker + " shutdown due to a remote error: " + error);
>         } else {
>             LOG.warn("Network connection between " + localBroker + " and " + 
> remoteBroker + " shutdown due to a remote error: " + error);
>         }
>         LOG.debug("The remote Exception was: " + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> private void fireBridgeFailed() {
>     NetworkBridgeListener l = this.networkBridgeListener;
>     if (l != null) {
>         l.bridgeFailed();
>     }
> }
> {code}
> {{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its 
> {{bridgeFailed()}} method calls back to 
> {{SimpleDiscoveryAgent.serviceFailed(...)}}:
> {code:title=DiscoveryNetworkConnectol.java}
> protected NetworkBridge createBridge(Transport localTransport, Transport 
> remoteTransport, final DiscoveryEvent event) {
>     class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
>         public DiscoverNetworkBridgeListener(BrokerService brokerService, 
> ObjectName connectorName) {
>             super(brokerService, connectorName);
>         }
>         public void bridgeFailed() {
>             if (!serviceSupport.isStopped()) {
>                 try {
>                     discoveryAgent.serviceFailed(event);
>                 } catch (IOException e) {
>                 }
>             }
>         }
>     }
> ...
> {code}
> In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the 
> {{reconnectDelay}} before attempting to re-establish the bridge via 
> {{DiscoveryNetworkConnector.onServiceAdd(...)}}:
> {code:title=SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
>     final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
>     if (sevent.failed.compareAndSet(false, true)) {
>         listener.onServiceRemove(sevent);
>         taskRunner.execute(new Runnable() {
>             public void run() {
>                 // We detect a failed connection attempt because the service
>                 // fails right
>                 // away.
>                 if (event.connectTime + minConnectTime > 
> System.currentTimeMillis()) {
>                     LOG.debug("Failure occurred soon after the discovery 
> event was generated.  It will be classified as a connection failure: "+event);
> ...
>                     synchronized (sleepMutex) {
>                         try {
>                             if (!running.get()) {
>                                 LOG.debug("Reconnecting disabled: stopped");
>                                 return;
>                             }
>                             LOG.debug("Waiting "+event.reconnectDelay+" ms 
> before attempting to reconnect.");
>                             sleepMutex.wait(event.reconnectDelay);
>                         } catch (InterruptedException ie) {
>                             LOG.debug("Reconnecting disabled: " + ie);
>                             Thread.currentThread().interrupt();
>                             return;
>                         }
>                     }
> ...
>                 event.connectTime = System.currentTimeMillis();
>                 event.failed.set(false);
>                 listener.onServiceAdd(event);
>             }
>         }, "Simple Discovery Agent");
>     }
> }
> {code}
> *NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!
> There are two race conditions that allow 
> {{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread, 
> each attempting to re-restablish the same bridge.
> First, note that 
> {{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches 
> a separate thread that stops the bridge:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceRemoteException(Throwable error) {
>     if (!disposed.get()) {
>         if (error instanceof SecurityException || error instanceof 
> GeneralSecurityException) {
>             LOG.error("Network connection between " + localBroker + " and " + 
> remoteBroker + " shutdown due to a remote error: " + error);
>         } else {
>             LOG.warn("Network connection between " + localBroker + " and " + 
> remoteBroker + " shutdown due to a remote error: " + error);
>         } 
>         LOG.debug("The remote Exception was: " + error, error);
>         brokerService.getTaskRunnerFactory().execute(new Runnable() {
>             public void run() {
>                 ServiceSupport.dispose(getControllingService());
>             }
>         });
>         fireBridgeFailed();
>     }
> }
> public void stop() throws Exception {
>     if (started.compareAndSet(true, false)) {
>         if (disposed.compareAndSet(false, true)) {
>             LOG.debug(" stopping " + configuration.getBrokerName() + " bridge 
> to " + remoteBrokerName);
>             NetworkBridgeListener l = this.networkBridgeListener;
>             if (l != null) {
>                 l.onStop(this);
>             }
> {code}
> When the bridge stops, the {{disposed}} flag is set, which prevents 
> subsequent calls to {{serviceLocal/RemoteException(...)}} from calling 
> {{fireBridgeFailed()}}.  However, since the call to 
> {{DemandForwardingBridgeSupport.stop()}} is made by a separate thread, 
> multiple {{serviceLocal/RemoteException(...)}} calls that are made in quick 
> succession can result in multiple calls to {{fireBridgeFailed()}}.
> This is the first race condition: multiple calls can be made to 
> {{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge.  By 
> transitivity, this results in multiple calls to 
> {{SimpleDiscoveryAgent.serviceFailed(...)}}.
> {{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class, 
> {{event.failed.compareAndSet(false, true)}}, which should only allow the 
> first call to launch a bridge reconnect thread.  However, once the 
> {{reconnectDelay}} expires, {{event.failed}} is reset to {{false}}, which 
> allows re-entry to the failure handling logic, and the possibile launching of 
> additional bridge reconnect threads if the {{reconnectDelay}} is short or the 
> threads calling {{serviceFailed(...)}} are delayed.
> This is the second race condition: the guard clause in 
> {{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the 
> subsequent redundant calls have been filtered out.
> These two race conditions allow a single call to 
> {{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple 
> subsequent concurrent (re)calls, and these concurrent calls can spawn their 
> own multiple concurrent calls.  The result can be an unlimited number of 
> concurrent calls to {{onServiceAdd(...)}}.
> Unit Test
> =========
> The attached unit test demonstrates this bug by simulating a bridge failure 
> that has yet to be detected by the remote broker (i.e., before the 
> {{InactivityMonitor}} closes the connection).  The local broker attempts to 
> re-establish the bridge, but its call to 
> {{DemandForwardingBridge.startRemoteBroker()}} fails because the remote 
> broker rejects the new connection since the old one still exists.  Since 
> {{startRemoteBroker}} sends multiple messages to the remote broker, multiple 
> exceptions are generated:
> {code:title=DemandForwardingBridgeSupport.java}
> protected void startRemoteBridge() throws Exception {
> ...
>                 remoteBroker.oneway(brokerInfo);
> ...
>             remoteBroker.oneway(remoteConnectionInfo);
> ...
>             remoteBroker.oneway(producerInfo);
> ...
>                 remoteBroker.oneway(demandConsumerInfo);
> }
> {code}
> The multiple exceptions result in multiple calls to 
> {{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows 
> the first race condition to be exhibited.  
> The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make 
> the second race condition improbable; therefore, this test generally passes. 
> The second unit test has a 0s {{reconnectDelay}}; on my system, this makes 
> the timing of multiple calls to 
> {{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the 
> second race condition is reliably exhibited, resulting in the unit test 
> failing because it detects concurrent calls to 
> {{DiscoveryNetworkConnector.onServiceAdd(...)}}.
> Solution
> ========
> While it would be possible to add a {{failed.compareAndSet(false,true)}} 
> guard clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and 
> prevent the first race condition from allowing multiple calls to 
> {{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race 
> condition in {{serviceFailed}}.  This can be trivially addressed by making a 
> copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}} 
> guard clause from being reset:
> {code:title=Patched SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
>     final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
>     if (sevent.failed.compareAndSet(false, true)) {
>         listener.onServiceRemove(sevent);
>         taskRunner.execute(new Runnable() {
>             public void run() {
>                 SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
> ...
>                 event.connectTime = System.currentTimeMillis();
>                 event.failed.set(false);
>                 listener.onServiceAdd(event);
>             }
>         }, "Simple Discovery Agent");
>     }
> }
> {code} 

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to