Repository: activemq-artemis Updated Branches: refs/heads/master 3e48cd778 -> 14723365a
ARTEMIS-1283 Fix delay on drained response On completion of drain the response is not flushed and the client can wait a few seconds before another broker task flushes the work. Flush the connection after updating the linked as being drained. Also perform the work with the connection lock held to prevent conccurent update of proton state. Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/22b8076b Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/22b8076b Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/22b8076b Branch: refs/heads/master Commit: 22b8076b714d4958589946a684225f19e055da78 Parents: 3e48cd7 Author: Timothy Bish <[email protected]> Authored: Wed Jul 12 18:19:28 2017 -0400 Committer: Timothy Bish <[email protected]> Committed: Wed Jul 12 19:20:01 2017 -0400 ---------------------------------------------------------------------- .../protocol/amqp/broker/AMQPSessionCallback.java | 2 +- .../amqp/proton/ProtonServerSenderContext.java | 14 ++++++++++++++ .../artemis/core/server/impl/ServerConsumerImpl.java | 4 ++-- 3 files changed, 17 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22b8076b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java index 0add7b7..ed15a56 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java @@ -129,7 +129,7 @@ public class AMQPSessionCallback implements SessionCallback { @Override public void run() { try { - plugSender.getSender().drained(); + plugSender.reportDrained(); } finally { draining.set(false); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22b8076b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 8f8222b..868e9c8 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -775,4 +775,18 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr return queue; } } + + /** + * Update link state to reflect that the previous drain attempt has completed. + */ + public void reportDrained() { + connection.lock(); + try { + sender.drained(); + } finally { + connection.unlock(); + } + + connection.flush(); + } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22b8076b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java ---------------------------------------------------------------------- diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java index 296088b..f614fa1 100644 --- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java +++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java @@ -579,10 +579,10 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener { forceDelivery(sequence, r); } }); - } else { - r.run(); + return; } } + r.run(); } catch (Exception e) { ActiveMQServerLogger.LOGGER.errorSendingForcedDelivery(e); }
