Stirling Chow created AMQ-4159:
----------------------------------

             Summary: Lack of thread-safety in SimpleDiscoveryAgent can cause 
multiple concurrent attempts to establish bridge, which can result in permanent 
bridge failure
                 Key: AMQ-4159
                 URL: https://issues.apache.org/jira/browse/AMQ-4159
             Project: ActiveMQ
          Issue Type: Bug
    Affects Versions: 5.8.0
            Reporter: Stirling Chow


Symptom
=======
I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and noticed 
that during one of the tests, concurrent calls were being made to 
{{DiscoveryNetworkConnector.onServiceAdd}} for the same {{DiscoveryEvent}}.  
This was unexpected because only a single service (URL) had been given to 
{{SimpleDiscoveryAgent}}.  In fact, during one of the tests I observed dozens 
of concurrent calls.

Concurrent attempts to establish a bridge to the *same* remote broker are 
problematic because they expose a number of race conditions in 
{{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent 
bridge failure, as well as unnecessary thread pool execution/resource usage and 
logging.

The race conditions will be filed as separate issues.  This issue specifically 
addresses the bug that causes {{SimpleDiscoveryAgent}} to uncontrollably 
multiply bridge connection attempts.

Cause
=====
When {{DemandForwardingBridgeSupport}} handles exceptions from either the local 
or remote sides of the the bridge, it fires a "bridge failed" event:

{code:title=DemandForwardingBridgeSupport.java}
public void serviceLocalException(Throwable error) {
    if (!disposed.get()) {
        LOG.info("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a local error: " + error);
        LOG.debug("The local Exception was:" + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}


public void serviceRemoteException(Throwable error) {
    if (!disposed.get()) {
        if (error instanceof SecurityException || error instanceof 
GeneralSecurityException) {
            LOG.error("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        } else {
            LOG.warn("Network connection between " + localBroker + " and " + 
remoteBroker + " shutdown due to a remote error: " + error);
        }
        LOG.debug("The remote Exception was: " + error, error);
        brokerService.getTaskRunnerFactory().execute(new Runnable() {
            public void run() {
                ServiceSupport.dispose(getControllingService());
            }
        });
        fireBridgeFailed();
    }
}

private void fireBridgeFailed() {
    NetworkBridgeListener l = this.networkBridgeListener;
    if (l != null) {
        l.bridgeFailed();
    }
}
{code}

{{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and it's 
{{bridgeFailed}} method calls back to 
{{SimpleDiscoveryAgent.serviceFailed(...)}}:

{code:title=DiscoveryNetworkConnectol.java}
protected NetworkBridge createBridge(Transport localTransport, Transport 
remoteTransport, final DiscoveryEvent event) {
    class DiscoverNetworkBridgeListener extends MBeanNetworkListener {

        public DiscoverNetworkBridgeListener(BrokerService brokerService, 
ObjectName connectorName) {
            super(brokerService, connectorName);
        }

        public void bridgeFailed() {
            if (!serviceSupport.isStopped()) {
                try {
                    discoveryAgent.serviceFailed(event);
                } catch (IOException e) {
                }
            }

        }
    }
...
{code}

In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the 
{{reconnectDelay}} before attempting to re-establish the bridge via 
{{DiscoveryNetworkConnector.onServiceAdd(...)}}:

{code:title=SimpleDiscoveryAgent.java}
public void serviceFailed(DiscoveryEvent devent) throws IOException {

    final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
    if (sevent.failed.compareAndSet(false, true)) {

        listener.onServiceRemove(sevent);
        taskRunner.execute(new Runnable() {
            public void run() {
                // We detect a failed connection attempt because the service
                // fails right
                // away.
                if (event.connectTime + minConnectTime > 
System.currentTimeMillis()) {
                    LOG.debug("Failure occurred soon after the discovery event 
was generated.  It will be classified as a connection failure: "+event);
...
                    synchronized (sleepMutex) {
                        try {
                            if (!running.get()) {
                                LOG.debug("Reconnecting disabled: stopped");
                                return;
                            }

                            LOG.debug("Waiting "+event.reconnectDelay+" ms 
before attempting to reconnect.");
                            sleepMutex.wait(event.reconnectDelay);
                        } catch (InterruptedException ie) {
                            LOG.debug("Reconnecting disabled: " + ie);
                            Thread.currentThread().interrupt();
                            return;
                        }
                    }
...
                event.connectTime = System.currentTimeMillis();
                event.failed.set(false);
                listener.onServiceAdd(event);
            }
        }, "Simple Discovery Agent");
    }
}
{code}


--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to