Repository: qpid-broker-j Updated Branches: refs/heads/6.1.x 9493668e8 -> 4748dbc99
QPID-7774: [AMQP 0-8..0-91] Ensure failover latch is nulled on all paths following a successful failover Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/28236751 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/28236751 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/28236751 Branch: refs/heads/6.1.x Commit: 28236751e5c4edcc00532fe33a1338b36308aed3 Parents: 9493668 Author: Keith Wall <[email protected]> Authored: Thu May 11 15:23:27 2017 +0100 Committer: Alex Rudyy <[email protected]> Committed: Fri May 12 13:14:20 2017 +0100 ---------------------------------------------------------------------- .../org/apache/qpid/client/AMQConnection.java | 1 + .../apache/qpid/client/AMQProtocolHandler.java | 28 +++++++++++++------- .../org/apache/qpid/client/FailoverHandler.java | 5 +--- 3 files changed, 21 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/28236751/client/src/main/java/org/apache/qpid/client/AMQConnection.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java index b650c55..4207746 100644 --- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java +++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java @@ -43,6 +43,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.ScheduledExecutorService; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/28236751/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java index bff7ace..f68d1a9 100644 --- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java +++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java @@ -277,14 +277,24 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, // the fail over. setFailoverLatch(new CountDownLatch(1)); - // We wake up listeners. If they can handle failover, they will extend the - // FailoverRetrySupport class and will in turn block on the latch until failover - // has completed before retrying the operation. - notifyFailoverStarting(); - - getConnection().doWithAllLocks(_failoverHandler); + try + { + // We wake up listeners. If they can handle failover, they will extend the + // FailoverRetrySupport class and will in turn block on the latch until failover + // has completed before retrying the operation. + notifyFailoverStarting(); - getFailoverLatch().countDown(); + getConnection().doWithAllLocks(_failoverHandler); + } + finally + { + CountDownLatch failoverLatch = getFailoverLatch(); + if (failoverLatch != null) + { + failoverLatch.countDown(); + setFailoverLatch(null); + } + } } }); } @@ -731,9 +741,9 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver, { if (_failoverLatch != null) { - if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) + if (!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS)) { - + _logger.debug("Timed out after waiting {}ms for failover to complete.", MAXIMUM_STATE_WAIT_TIME); } } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/28236751/client/src/main/java/org/apache/qpid/client/FailoverHandler.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/qpid/client/FailoverHandler.java b/client/src/main/java/org/apache/qpid/client/FailoverHandler.java index bc89dc6..d90115e 100644 --- a/client/src/main/java/org/apache/qpid/client/FailoverHandler.java +++ b/client/src/main/java/org/apache/qpid/client/FailoverHandler.java @@ -52,6 +52,7 @@ public class FailoverHandler implements Runnable /** * Performs the failover procedure. */ + @Override public void run() { AMQConnection connection = _amqProtocolHandler.getConnection(); @@ -82,10 +83,6 @@ public class FailoverHandler implements Runnable AMQDisconnectedException cause = new AMQDisconnectedException("Failover was vetoed by client", null); connection.closed(cause); - - _amqProtocolHandler.getFailoverLatch().countDown(); - _amqProtocolHandler.setFailoverLatch(null); - return; } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
