[
https://issues.apache.org/jira/browse/AMQ-4159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13496681#comment-13496681
]
Stirling Chow commented on AMQ-4159:
------------------------------------
AMQ-4159 stops the network connector and restarts it. I just patched AMQ-4160
to address an issue where activeEvents was not being cleared on
DiscoveryNetworkConnector.handleStop --- as a result, the restart would be
ignored. This could have caused the hanging test, although I believe it
occurred prior to the change for AMQ-4160 that would have caused the restart to
be ignored.
I've periodically seen bridge formation stop when the network connector is
stopped and restarted in quick succession. I'm continuing to investigate.
> Race condition in SimpleDiscoveryAgent creates multiple concurrent threads
> attempting to connect to the same bridge --- can result in deadlock
> ----------------------------------------------------------------------------------------------------------------------------------------------
>
> Key: AMQ-4159
> URL: https://issues.apache.org/jira/browse/AMQ-4159
> Project: ActiveMQ
> Issue Type: Bug
> Affects Versions: 5.8.0
> Reporter: Stirling Chow
> Assignee: Timothy Bish
> Priority: Critical
> Fix For: 5.8.0
>
> Attachments: AMQ4159.patch, AMQ4159Test.java
>
>
> Symptom
> =======
> I was diagnosing a deadlock issue in {{DiscoveryNetworkConnector}} and
> noticed that during one of the tests, concurrent calls were being made to
> {{DiscoveryNetworkConnector.onServiceAdd(...)}} for the same
> {{DiscoveryEvent}}. This was unexpected because only a single service (URL)
> had been given to {{SimpleDiscoveryAgent}}. In fact, during one of the tests
> I observed dozens of concurrent calls.
> Concurrent attempts to establish a bridge to the *same* remote broker are
> problematic because they expose a number of race conditions in
> {{DiscoveryNetworkConnector}} and {{RegionBroker}} that can lead to permanent
> bridge failure (see AMQ-4160), as well as unnecessary thread pool
> execution/resource usage and logging.
> The issues with {{DiscoveryNetworkConnector}} and {{RegionBroker}} will be
> filed as separate issues. This issue specifically addresses the bug that
> causes {{SimpleDiscoveryAgent}} to uncontrollably multiply bridge connection
> attempts.
> Cause
> =====
> When {{DemandForwardingBridgeSupport}} handles exceptions from either the
> local or remote sides of the the bridge, it fires a "bridge failed" event:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceLocalException(Throwable error) {
> if (!disposed.get()) {
> LOG.info("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a local error: " + error);
> LOG.debug("The local Exception was:" + error, error);
> brokerService.getTaskRunnerFactory().execute(new Runnable() {
> public void run() {
> ServiceSupport.dispose(getControllingService());
> }
> });
> fireBridgeFailed();
> }
> }
> public void serviceRemoteException(Throwable error) {
> if (!disposed.get()) {
> if (error instanceof SecurityException || error instanceof
> GeneralSecurityException) {
> LOG.error("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a remote error: " + error);
> } else {
> LOG.warn("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a remote error: " + error);
> }
> LOG.debug("The remote Exception was: " + error, error);
> brokerService.getTaskRunnerFactory().execute(new Runnable() {
> public void run() {
> ServiceSupport.dispose(getControllingService());
> }
> });
> fireBridgeFailed();
> }
> }
> private void fireBridgeFailed() {
> NetworkBridgeListener l = this.networkBridgeListener;
> if (l != null) {
> l.bridgeFailed();
> }
> }
> {code}
> {{DiscoveryNetworkConnector}} is the {{NetworkBridgeListener}}, and its
> {{bridgeFailed()}} method calls back to
> {{SimpleDiscoveryAgent.serviceFailed(...)}}:
> {code:title=DiscoveryNetworkConnectol.java}
> protected NetworkBridge createBridge(Transport localTransport, Transport
> remoteTransport, final DiscoveryEvent event) {
> class DiscoverNetworkBridgeListener extends MBeanNetworkListener {
> public DiscoverNetworkBridgeListener(BrokerService brokerService,
> ObjectName connectorName) {
> super(brokerService, connectorName);
> }
> public void bridgeFailed() {
> if (!serviceSupport.isStopped()) {
> try {
> discoveryAgent.serviceFailed(event);
> } catch (IOException e) {
> }
> }
> }
> }
> ...
> {code}
> In response, {{SimpleDiscoveryAgent.serviceFailed(...)}} pauses for the
> {{reconnectDelay}} before attempting to re-establish the bridge via
> {{DiscoveryNetworkConnector.onServiceAdd(...)}}:
> {code:title=SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
> final SimpleDiscoveryEvent event = (SimpleDiscoveryEvent)devent;
> if (sevent.failed.compareAndSet(false, true)) {
> listener.onServiceRemove(sevent);
> taskRunner.execute(new Runnable() {
> public void run() {
> // We detect a failed connection attempt because the service
> // fails right
> // away.
> if (event.connectTime + minConnectTime >
> System.currentTimeMillis()) {
> LOG.debug("Failure occurred soon after the discovery
> event was generated. It will be classified as a connection failure: "+event);
> ...
> synchronized (sleepMutex) {
> try {
> if (!running.get()) {
> LOG.debug("Reconnecting disabled: stopped");
> return;
> }
> LOG.debug("Waiting "+event.reconnectDelay+" ms
> before attempting to reconnect.");
> sleepMutex.wait(event.reconnectDelay);
> } catch (InterruptedException ie) {
> LOG.debug("Reconnecting disabled: " + ie);
> Thread.currentThread().interrupt();
> return;
> }
> }
> ...
> event.connectTime = System.currentTimeMillis();
> event.failed.set(false);
> listener.onServiceAdd(event);
> }
> }, "Simple Discovery Agent");
> }
> }
> {code}
> *NOTE*: the call to {{listener.onServiceAdd(...)}} is made by a new thread!
> There are two race conditions that allow
> {{SimpleDiscoveryAgent.serviceFailed(...)}} to launch more than one thread,
> each attempting to re-restablish the same bridge.
> First, note that
> {{DemandForwardingBridgeSupport.serviceLocal/RemoteException(...)}} launches
> a separate thread that stops the bridge:
> {code:title=DemandForwardingBridgeSupport.java}
> public void serviceRemoteException(Throwable error) {
> if (!disposed.get()) {
> if (error instanceof SecurityException || error instanceof
> GeneralSecurityException) {
> LOG.error("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a remote error: " + error);
> } else {
> LOG.warn("Network connection between " + localBroker + " and " +
> remoteBroker + " shutdown due to a remote error: " + error);
> }
> LOG.debug("The remote Exception was: " + error, error);
> brokerService.getTaskRunnerFactory().execute(new Runnable() {
> public void run() {
> ServiceSupport.dispose(getControllingService());
> }
> });
> fireBridgeFailed();
> }
> }
> public void stop() throws Exception {
> if (started.compareAndSet(true, false)) {
> if (disposed.compareAndSet(false, true)) {
> LOG.debug(" stopping " + configuration.getBrokerName() + " bridge
> to " + remoteBrokerName);
> NetworkBridgeListener l = this.networkBridgeListener;
> if (l != null) {
> l.onStop(this);
> }
> {code}
> When the bridge stops, the {{disposed}} flag is set, which prevents
> subsequent calls to {{serviceLocal/RemoteException(...)}} from calling
> {{fireBridgeFailed()}}. However, since the call to
> {{DemandForwardingBridgeSupport.stop()}} is made by a separate thread,
> multiple {{serviceLocal/RemoteException(...)}} calls that are made in quick
> succession can result in multiple calls to {{fireBridgeFailed()}}.
> This is the first race condition: multiple calls can be made to
> {{DiscoveryNetworkConnector.bridgeFailed()}} for the same bridge. By
> transitivity, this results in multiple calls to
> {{SimpleDiscoveryAgent.serviceFailed(...)}}.
> {{SimpleDiscoveryAgent.serviceFailed(...)}} has a guard class,
> {{event.failed.compareAndSet(false, true)}}, which should only allow the
> first call to launch a bridge reconnect thread. However, once the
> {{reconnectDelay}} expires, {{event.failed}} is reset to {{false}}, which
> allows re-entry to the failure handling logic, and the possibile launching of
> additional bridge reconnect threads if the {{reconnectDelay}} is short or the
> threads calling {{serviceFailed(...)}} are delayed.
> This is the second race condition: the guard clause in
> {{SimpleDiscoveryAgent.serviceFailed(...)}} can be reset before the
> subsequent redundant calls have been filtered out.
> These two race conditions allow a single call to
> {{DiscoveryNetworkConnector.onServiceAdd(...)}} to result in multiple
> subsequent concurrent (re)calls, and these concurrent calls can spawn their
> own multiple concurrent calls. The result can be an unlimited number of
> concurrent calls to {{onServiceAdd(...)}}.
> Unit Test
> =========
> The attached unit test demonstrates this bug by simulating a bridge failure
> that has yet to be detected by the remote broker (i.e., before the
> {{InactivityMonitor}} closes the connection). The local broker attempts to
> re-establish the bridge, but its call to
> {{DemandForwardingBridge.startRemoteBroker()}} fails because the remote
> broker rejects the new connection since the old one still exists. Since
> {{startRemoteBroker}} sends multiple messages to the remote broker, multiple
> exceptions are generated:
> {code:title=DemandForwardingBridgeSupport.java}
> protected void startRemoteBridge() throws Exception {
> ...
> remoteBroker.oneway(brokerInfo);
> ...
> remoteBroker.oneway(remoteConnectionInfo);
> ...
> remoteBroker.oneway(producerInfo);
> ...
> remoteBroker.oneway(demandConsumerInfo);
> }
> {code}
> The multiple exceptions result in multiple calls to
> {{DemandForwardingBridgeSupport.serviceRemoteException(...)}}, which allows
> the first race condition to be exhibited.
> The first unit test has a 1s {{reconnectDelay}}, which is sufficient to make
> the second race condition improbable; therefore, this test generally passes.
> The second unit test has a 0s {[reconnectDelay}; on my system, this makes the
> timing of multiple calls to
> {{DemandForwardingBridgeSupport.serviceRemoteException(...)}} such that the
> second race condition is reliably exhibited, resulting in the unit test
> failing because it detects concurrent calls to
> {{DiscoveryNetworkConnector.onServiceAdd(...)}}.
> Solution
> ========
> While it would be possible to add a {{failed.compareAndSet(false,true)}}
> guard clause to {{DemandForwardingBridgeSupport.fireBridgeFailed()}}, and
> prevent the first race condition from allowing multiple calls to
> {{SimpleDiscoveryAgent.serviceFailed()}}, the root problem is the race
> condition in {{serviceFailed}}. This can be trivially addressed by making a
> copy of the {{DiscoveryEvent}}, which prevents the original {{event.failed}}
> guard clause from being reset:
> {code:title=Patched SimpleDiscoveryAgent.java}
> public void serviceFailed(DiscoveryEvent devent) throws IOException {
> final SimpleDiscoveryEvent sevent = (SimpleDiscoveryEvent)devent;
> if (sevent.failed.compareAndSet(false, true)) {
> listener.onServiceRemove(sevent);
> taskRunner.execute(new Runnable() {
> public void run() {
> SimpleDiscoveryEvent event = new SimpleDiscoveryEvent(sevent);
> ...
> event.connectTime = System.currentTimeMillis();
> event.failed.set(false);
> listener.onServiceAdd(event);
> }
> }, "Simple Discovery Agent");
> }
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira