bridge reconnection stops because of race in SimpleDiscoveryAgent
-----------------------------------------------------------------

                 Key: AMQ-1855
                 URL: https://issues.apache.org/activemq/browse/AMQ-1855
             Project: ActiveMQ
          Issue Type: Bug
          Components: Connector
    Affects Versions: 4.1.2
            Reporter: Mario Lukica


I believe there is a race condition in SimpleDiscoveryAgent which can cause 
subsequent bridge restart to fail, without starting new thread that should 
restart a bridge. As a consequence, network bridge is never restarted.

Following scenario leads to this:
1. bridge is disconnected (e.g. local error: 
org.apache.activemq.transport.InactivityIOException: Channel was inactive for 
too long)
2. bridge is disposed in separate thread in 
DemandForwardingBridge.serviceLocalException
3. SimpleDiscoveryAgent.serviceFailed is called which starts up another thread 
which calls DiscoveryNetworkConnector.onServiceAdd which tries to restart bridge
4. bridge startup can cause javax.jms.InvalidClientIDException: Broker: 
some_broker2 - Client: NC_some_broker1_inboundlocalhost already connected (this 
one is caused by race condition with thread disposing the bridge, since given 
client subscription should be removed by thread disposing the bridge (step 2)
5. this causes invocation of DemandForwardingBridge.serviceLocalException (this 
call can be made asynchronously, while previous bridge startup is still in 
progress)

As a consequence, multiple threads can end up calling 
SimpleDiscoveryAgent.serviceFailed simultaneously.

serviceFailed will call DiscoveryNetworkConnector.onServiceAdd which will try 
to reconnect bridge. Reconnect logic is guarded by 
if( event.failed.compareAndSet(false, true) ) 

which tries to ensure that only a single thread is reconnecting bridge at some 
point.
{code}
    public void serviceFailed(DiscoveryEvent devent) throws IOException {
        
        final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent) devent;
        if( event.failed.compareAndSet(false, true) ) {
                
                        listener.onServiceRemove(event);
                Thread thread = new Thread() {
                        public void run() {
        
        
                                // We detect a failed connection attempt 
because the service fails right
                                // away.
                                if( event.connectTime + minConnectTime > 
System.currentTimeMillis()  ) {
                                        
                                        event.connectFailures++;
                                        
                                        if( maxReconnectAttempts>0 &&  
event.connectFailures >= maxReconnectAttempts ) {
                                                // Don' try to re-connect
                                                return;
                                        }
                                        
                                synchronized(sleepMutex){
                                    try{
                                        if( !running.get() )
                                                return;
                                        
                                        sleepMutex.wait(event.reconnectDelay);
                                    }catch(InterruptedException ie){
                                Thread.currentThread().interrupt();
                                       return;
                                    }
                                }
        
                                if (!useExponentialBackOff) {
                                    event.reconnectDelay = 
initialReconnectDelay;
                                } else {
                                    // Exponential increment of reconnect delay.
                                    event.reconnectDelay*=backOffMultiplier;
                                    if(event.reconnectDelay>maxReconnectDelay)
                                        event.reconnectDelay=maxReconnectDelay;
                                }
                                
                                } else {
                                        event.connectFailures = 0;
                            event.reconnectDelay = initialReconnectDelay;
                                }
                                                                        
                        if( !running.get() )
                                return;
                        
                                event.connectTime = System.currentTimeMillis();
                                event.failed.set(false);
                                
                                listener.onServiceAdd(event);
                        }
                };
                thread.setDaemon(true);
                thread.start();
        }
    }
{code}

Prior to calling DiscoveryNetworkConnector.onServiceAdd, event.failed is set to 
false (T1), and it's possible for some other thread (T2) to enter block guarded 
by if( event.failed.compareAndSet(false, true) ) , while reconnect process has 
already begun by first thread. T2 can satisfy condition: if( event.connectTime 
+ minConnectTime > System.currentTimeMillis()  )  and will enter 
sleepMutex.wait(event.reconnectDelay), but still holding event.failed == true 
(causing all other calls to serviceFailed not to start thread that will 
reconnect bridge).

If first thread (T1) fails to reconnect bridge (e.g because of 
InvalidClientIDException described in step 4), it will not schedule new thread 
to restart broker (and call DiscoveryNetworkConnector.onServiceRemove, and 
cleanup DiscoveryNetworkConnector.bridges) because of event.failed == true, and 
T2 still waiting (default 5 sec). When T2 wakes up from wait, it will try to 
restart broker and fail because of following condition in 
DiscoveryNetworkConnector:
{code}
            if (    bridges.containsKey(uri) 
                    || localURI.equals(uri) 
                    || (connectionFilter!=null && 
!connectionFilter.connectTo(uri))
                    )
                return;
{code}

bridges.containsKey(uri) will be true (thread T1 added it while unsuccessfully 
trying to reconnect bridge), and T2 will return from 
DiscoveryNetworkConnector.onServiceAdd and will not start bridge. 
No additional attempt to reconnect bridge will be made, since T2 held 
event.failed == true, effectively ignoring SimpleDiscoveryAgent.serviceFailed 
calls from other threads processing local or remote bridge exceptions.

End result:
- DiscoveryNetworkConnector.bridges contains bridge that is disposed and 
prevents all other attempts to restart bridge (onServiceAdd always returns 
because bridges.containsKey(uri) == true) 
- SimpleDiscoveryAgent doesn't try to reconnect the bridge (T2 was a last 
attempt which returned without restarting the bridge - 
SimpleDiscoveryAgent.serviceFailed is not called again, since bridge is not 
started

I think that synchronization of threads processing bridge exceptions and 
entering SimpleDiscoveryAgent.serviceFailed should be verified and/or improved.
Also, InvalidClientIDException is relatively common (at least on multicore 
machines, e.g. Solaris T2000), maybe ConduitBridge.serviceLocalException (which 
starts another thread doing 
ServiceSupport.dispose(DemandForwardingBridgeSupport.this)), should be changed 
to wait a bit for bridge disposal to finish (e.g. sleep for some time) and then 
try to restart a bridge - waiting for a second more to restart a bridge is 
better then not to start it at all

I've seen this problem in 4.1.0 and 4.1.2, but I think it can occur in 5.1 and 
5.2 trunk (SimpleDiscoveryAgent.serviceFailed and 
DiscoveryNetworkConnector.onServiceAdd are more or less the same, just using 
ASYNC_TASKS to execute asynchronous calls, instead of starting new threads 
directly.

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to