Repository: qpid-dispatch
Updated Branches:
  refs/heads/1.1.x d44e8a98b -> 2e48f9042


DISPATCH-966 - Fixed two bugs preventing very large inter-router messages from 
being delivered.  These were regressions introduced with large-message 
streaming.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/2e48f904
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/2e48f904
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/2e48f904

Branch: refs/heads/1.1.x
Commit: 2e48f9042fca10a1657a6a2afa50f93fabb6338f
Parents: d44e8a9
Author: Ted Ross <tr...@redhat.com>
Authored: Thu May 17 12:54:04 2018 -0400
Committer: Ted Ross <tr...@redhat.com>
Committed: Thu May 17 12:56:05 2018 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/message.h | 12 ++----------
 src/message.c                   | 18 +++++++++---------
 src/message_private.h           |  1 +
 src/router_core/forwarder.c     |  9 ++++++---
 src/router_core/transfer.c      |  4 ++--
 5 files changed, 20 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 78c0b95..ca2ab47 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -365,19 +365,11 @@ size_t qd_message_fanout(qd_message_t *msg);
 void qd_message_add_fanout(qd_message_t *msg);
 
 /**
- * Setter for message Q2 input_holdoff state
+ * Disable the Q2-holdoff for this message.
  *
  * @param msg A pointer to the message
  */
-void qd_message_set_Q2_input_holdoff(qd_message_t *msg, bool holdoff);
-
-/**
- * Accessor for message Q2 input_holdoff state
- *
- * @param msg A pointer to the message
- * @return true if input is being held off
- */
-bool qd_message_get_Q2_input_holdoff(qd_message_t *msg);
+void qd_message_Q2_holdoff_disable(qd_message_t *msg);
 
 /**
  * Test if attempt to retreive message data through qd_message_recv should 
block

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 03f475b..ba45ff4 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1933,21 +1933,21 @@ int qd_message_get_phase_val(qd_message_t *msg)
 }
 
 
-void qd_message_set_Q2_input_holdoff(qd_message_t *msg, bool holdoff)
+void qd_message_Q2_holdoff_disable(qd_message_t *msg)
 {
-    ((qd_message_pvt_t*)msg)->content->q2_input_holdoff = holdoff;
-}
-
-
-bool qd_message_get_Q2_input_holdoff(qd_message_t *msg)
-{
-    return ((qd_message_pvt_t*)msg)->content->q2_input_holdoff;
+    if (!msg)
+        return;
+    qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg;
+    msg_pvt->content->disable_q2_holdoff = true;
 }
 
 
 bool qd_message_Q2_holdoff_should_block(qd_message_t *msg)
 {
-    return DEQ_SIZE(((qd_message_pvt_t*)msg)->content->buffers) >= 
QD_QLIMIT_Q2_UPPER;
+    if (!msg)
+        return false;
+    qd_message_pvt_t *msg_pvt = (qd_message_pvt_t*) msg;
+    return !msg_pvt->content->disable_q2_holdoff && 
DEQ_SIZE(msg_pvt->content->buffers) >= QD_QLIMIT_Q2_UPPER;
 }
 
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index 62d438b..fe8147f 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -114,6 +114,7 @@ typedef struct {
     bool                 receive_complete;               // true if the 
message has been completely received, false otherwise
     bool                 q2_input_holdoff;               // hold off calling 
pn_link_recv
     bool                 aborted;                        // receive completed 
with abort flag set
+    bool                 disable_q2_holdoff;             // Disable the Q2 
flow control
 } qd_message_content_t;
 
 typedef struct {

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index 7ab8a46..fca19c3 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -346,13 +346,14 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
             //
             if (receive_complete)
                 qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
-            else
+            else {
                 //
                 // Receive is not complete, we will store the sub in 
in_delivery->subscriptions so we can send the message to the subscription
                 // after the message fully arrives
                 //
                 DEQ_INSERT_TAIL(in_delivery->subscriptions, sub);
-
+                qd_message_Q2_holdoff_disable(msg);
+            }
 
             fanout++;
             addr->deliveries_to_container++;
@@ -408,12 +409,14 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
             //
             if (receive_complete)
                 qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
-            else
+            else {
                 //
                 // Receive is not complete, we will store the sub in 
in_delivery->subscriptions so we can send the message to the subscription
                 // after the message fully arrives
                 //
                 DEQ_INSERT_TAIL(in_delivery->subscriptions, sub);
+                qd_message_Q2_holdoff_disable(msg);
+            }
 
             //
             // If the incoming delivery is not settled, it should be accepted 
and settled here.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/2e48f904/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index e8d37ef..706a26d 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -1144,9 +1144,9 @@ static void qdr_deliver_continue_CT(qdr_core_t *core, 
qdr_action_t *action, bool
     qdr_delivery_decref_CT(core, in_dlv, "qdr_deliver_continue_CT - remove 
from action");
 
     //
-    // If it is already in the undelivered list or it has no peers, don't try 
to deliver this again.
+    // If it is already in the undelivered list, don't try to deliver this 
again.
     //
-    if (in_dlv->where == QDR_DELIVERY_IN_UNDELIVERED || 
!qdr_delivery_has_peer_CT(in_dlv))
+    if (in_dlv->where == QDR_DELIVERY_IN_UNDELIVERED)
         return;
 
     qdr_deliver_continue_peers_CT(core, in_dlv);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to