Repository: activemq-artemis
Updated Branches:
  refs/heads/master 3e48cd778 -> 14723365a


ARTEMIS-1283 Fix delay on drained response

On completion of drain the response is not flushed and the
client can wait a few seconds before another broker task
flushes the work.  Flush the connection after updating the
linked as being drained.  Also perform the work with the
connection lock held to prevent conccurent update of proton
state.


Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/22b8076b
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/22b8076b
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/22b8076b

Branch: refs/heads/master
Commit: 22b8076b714d4958589946a684225f19e055da78
Parents: 3e48cd7
Author: Timothy Bish <[email protected]>
Authored: Wed Jul 12 18:19:28 2017 -0400
Committer: Timothy Bish <[email protected]>
Committed: Wed Jul 12 19:20:01 2017 -0400

----------------------------------------------------------------------
 .../protocol/amqp/broker/AMQPSessionCallback.java     |  2 +-
 .../amqp/proton/ProtonServerSenderContext.java        | 14 ++++++++++++++
 .../artemis/core/server/impl/ServerConsumerImpl.java  |  4 ++--
 3 files changed, 17 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22b8076b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
index 0add7b7..ed15a56 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/broker/AMQPSessionCallback.java
@@ -129,7 +129,7 @@ public class AMQPSessionCallback implements SessionCallback 
{
                @Override
                public void run() {
                   try {
-                     plugSender.getSender().drained();
+                     plugSender.reportDrained();
                   } finally {
                      draining.set(false);
                   }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22b8076b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
----------------------------------------------------------------------
diff --git 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
index 8f8222b..868e9c8 100644
--- 
a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
+++ 
b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java
@@ -775,4 +775,18 @@ public class ProtonServerSenderContext extends 
ProtonInitializable implements Pr
          return queue;
       }
    }
+
+   /**
+    * Update link state to reflect that the previous drain attempt has 
completed.
+    */
+   public void reportDrained() {
+      connection.lock();
+      try {
+         sender.drained();
+      } finally {
+         connection.unlock();
+      }
+
+      connection.flush();
+   }
 }

http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/22b8076b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 296088b..f614fa1 100644
--- 
a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ 
b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -579,10 +579,10 @@ public class ServerConsumerImpl implements 
ServerConsumer, ReadyListener {
                            forceDelivery(sequence, r);
                         }
                      });
-                  } else {
-                     r.run();
+                     return;
                   }
                }
+               r.run();
             } catch (Exception e) {
                ActiveMQServerLogger.LOGGER.errorSendingForcedDelivery(e);
             }

Reply via email to