QPID-7774: Improve locking when using failover latch
Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/4748dbc9 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/4748dbc9 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/4748dbc9 Branch: refs/heads/6.1.x Commit: 4748dbc9939e711c5acdf191edf61a3d7a28959d Parents: 2823675 Author: Alex Rudyy <[email protected]> Authored: Fri May 12 10:17:55 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Fri May 12 13:14:43 2017 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/client/AMQConnection.java | 1 - .../qpid/client/AMQConnectionDelegate_0_10.java | 76 ++++++++++---------- .../apache/qpid/client/AMQProtocolHandler.java | 25 +++---- 3 files changed, 48 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4748dbc9/client/src/main/java/org/apache/qpid/client/AMQConnection.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java index 4207746..b650c55 100644 --- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -43,7 +43,6 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4748dbc9/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java b/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java index 86ab1ff..0726a59 100644 --- a/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java +++ b/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java @@ -331,53 +331,53 @@ public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate, Connec final ConnectionClose close = exc.getClose(); if (close == null || close.getReplyCode() == ConnectionCloseCode.CONNECTION_FORCED) { - _conn.getProtocolHandler().setFailoverLatch(new CountDownLatch(1)); - - _qpidConnection.notifyFailoverRequired(); - + final CountDownLatch failoverLatch = new CountDownLatch(1); + _conn.getProtocolHandler().setFailoverLatch(failoverLatch); final AtomicBoolean failoverDone = new AtomicBoolean(); - - _conn.doWithAllLocks(new Runnable() + try { - @Override - public void run() + _qpidConnection.notifyFailoverRequired(); + _conn.doWithAllLocks(new Runnable() { - try + @Override + public void run() { - boolean preFailover = _conn.firePreFailover(false); - if (preFailover) + try { - boolean reconnected; - if(exc instanceof RedirectConnectionException) + boolean preFailover = _conn.firePreFailover(false); + if (preFailover) { - RedirectConnectionException redirect = (RedirectConnectionException)exc; - reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts()); - } - else - { - reconnected = _conn.attemptReconnection(); - } - if(reconnected) - { - failoverPrep(); - _conn.resubscribeSessions(); - _conn.fireFailoverComplete(); - failoverDone.set(true); + boolean reconnected; + if (exc instanceof RedirectConnectionException) + { + RedirectConnectionException redirect = (RedirectConnectionException) exc; + reconnected = attemptRedirection(redirect.getHost(), redirect.getKnownHosts()); + } + else + { + reconnected = _conn.attemptReconnection(); + } + if (reconnected) + { + failoverPrep(); + _conn.resubscribeSessions(); + _conn.fireFailoverComplete(); + failoverDone.set(true); + } } } + catch (Exception e) + { + _logger.error("error during failover", e); + } } - catch (Exception e) - { - _logger.error("error during failover", e); - } - finally - { - _conn.getProtocolHandler().getFailoverLatch().countDown(); - _conn.getProtocolHandler().setFailoverLatch(null); - } - - } - }); + }); + } + finally + { + failoverLatch.countDown(); + _conn.getProtocolHandler().setFailoverLatch(null); + } if (failoverDone.get()) http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/4748dbc9/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java index f68d1a9..ca3c900 100644 --- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java +++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java @@ -275,7 +275,8 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, // Create a latch, upon which tasks that must not run in parallel with a failover can wait for completion of // the fail over. - setFailoverLatch(new CountDownLatch(1)); + final CountDownLatch failoverLatch = new CountDownLatch(1); + setFailoverLatch(failoverLatch); try { @@ -288,12 +289,8 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, } finally { - CountDownLatch failoverLatch = getFailoverLatch(); - if (failoverLatch != null) - { - failoverLatch.countDown(); - setFailoverLatch(null); - } + failoverLatch.countDown(); + setFailoverLatch(null); } } }); @@ -737,14 +734,12 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, public void blockUntilNotFailingOver() throws InterruptedException { - synchronized(_failoverLatchChange) + CountDownLatch failoverLatch = getFailoverLatch(); + if (failoverLatch != null) { - if (_failoverLatch != null) + if (!failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) { - if (!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) - { - _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME); - } + _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME); } } } @@ -762,7 +757,7 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, return queueName.replaceAll("_+", "_"); } - public CountDownLatch getFailoverLatch() + CountDownLatch getFailoverLatch() { synchronized (_failoverLatchChange) { @@ -770,7 +765,7 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, } } - public void setFailoverLatch(CountDownLatch failoverLatch) + void setFailoverLatch(CountDownLatch failoverLatch) { synchronized (_failoverLatchChange) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
