AMQ-6813: update tick deadline handling to account for potential to be negative due to using nanoTime derived values, plus other edge case protections
(cherry picked from commit f82eccd2f504b59c2e98ba8273e28f4d7a2a8698) Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/4f6326f4 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/4f6326f4 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/4f6326f4 Branch: refs/heads/activemq-5.15.x Commit: 4f6326f4fb35867b6d83c624a947a4510d0f674f Parents: e1ac826 Author: Robbie Gemmell <[email protected]> Authored: Fri Sep 15 17:52:36 2017 +0100 Committer: Robbie Gemmell <[email protected]> Committed: Fri Sep 15 18:04:01 2017 +0100 ---------------------------------------------------------------------- .../transport/amqp/protocol/AmqpConnection.java | 15 ++++++++++----- .../transport/amqp/client/AmqpConnection.java | 15 ++++++++------- 2 files changed, 18 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/4f6326f4/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java index 57b2502..dec1bc9 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/protocol/AmqpConnection.java @@ -251,12 +251,16 @@ public class AmqpConnection implements AmqpProtocolConverter { if (protonConnection.getLocalState() != EndpointState.CLOSED) { // Using nano time since it is not related to the wall clock, which may change long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - rescheduleAt = protonTransport.tick(now) - now; + long deadline = protonTransport.tick(now); pumpProtonToSocket(); if (protonTransport.isClosed()) { - rescheduleAt = 0; LOG.debug("Transport closed after inactivity check."); - throw new InactivityIOException("Channel was inactive for to long"); + throw new InactivityIOException("Channel was inactive for too long"); + } else { + if(deadline != 0) { + // caller treats 0 as no-work, ensure value is at least 1 as there was a deadline + rescheduleAt = Math.max(deadline - now, 1); + } } } @@ -835,8 +839,9 @@ public class AmqpConnection implements AmqpProtocolConverter { // Using nano time since it is not related to the wall clock, which may change long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); long nextIdleCheck = protonTransport.tick(now); - if (nextIdleCheck > 0) { - long delay = nextIdleCheck - now; + if (nextIdleCheck != 0) { + // monitor treats <= 0 as no work, ensure value is at least 1 as there was a deadline + long delay = Math.max(nextIdleCheck - now, 1); LOG.trace("Connection keep-alive processing starts in: {}", delay); monitor.startKeepAliveTask(delay); } else { http://git-wip-us.apache.org/repos/asf/activemq/blob/4f6326f4/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 1f3fe09..3fe3ab6 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -587,7 +587,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements // Using nano time since it is not related to the wall clock, which may change long initialNow = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); long initialKeepAliveDeadline = protonTransport.tick(initialNow); - if (initialKeepAliveDeadline > 0) { + if (initialKeepAliveDeadline != 0) { getScheduler().schedule(new Runnable() { @@ -598,15 +598,16 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements LOG.debug("Client performing next idle check"); // Using nano time since it is not related to the wall clock, which may change long now = TimeUnit.NANOSECONDS.toMillis(System.nanoTime()); - long rescheduleAt = protonTransport.tick(now) - now; + long deadline = protonTransport.tick(now); + pumpToProtonTransport(); if (protonTransport.isClosed()) { LOG.debug("Transport closed after inactivity check."); - throw new InactivityIOException("Channel was inactive for to long"); - } - - if (rescheduleAt > 0) { - getScheduler().schedule(this, rescheduleAt, TimeUnit.MILLISECONDS); + throw new InactivityIOException("Channel was inactive for too long"); + } else { + if(deadline != 0) { + getScheduler().schedule(this, deadline - now, TimeUnit.MILLISECONDS); + } } } } catch (Exception e) {
