DISPATCH-767 - Added code to support multi frame message streaming
1. Added new core API function qdr_deliver_continue() that will continue
delivering a large message
2. Modified qdr_link_process_deliveries() to not remove deliveries from
the undelivered list until they are fully delivered
3. Modified qd_message_receive() to recieve partial messages.
4. Modified qd_message_send() to be able to handle streaming send. This
function can now be called multiple times for the same message. It keeps
internal pointers to the point upto which the message has been sent and
is able to continue where it left off. Message content buffers are freed
as soon as the message has been sent to all recipients.
5. Added peer linkage for large settled deliveries and added a settled
list to handle with abrupt connection terminations when large messages
are being transmitted.
6. Added unit tests to test large messages.


Project: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/commit/c9262728
Tree: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/tree/c9262728
Diff: http://git-wip-us.apache.org/repos/asf/qpid-dispatch/diff/c9262728

Branch: refs/heads/master
Commit: c9262728dc1a6d1bcdc5a68f4b19f1b9d2221d53
Parents: 1ca80b6
Author: Ganesh Murthy <gmur...@redhat.com>
Authored: Wed Jul 5 11:51:06 2017 -0400
Committer: Ganesh Murthy <gmur...@redhat.com>
Committed: Thu Aug 10 16:10:09 2017 -0400

----------------------------------------------------------------------
 include/qpid/dispatch/buffer.h        |  21 ++
 include/qpid/dispatch/message.h       |  61 ++++-
 include/qpid/dispatch/router_core.h   |   8 +-
 src/buffer.c                          |  35 ++-
 src/message.c                         | 360 +++++++++++++++++++-------
 src/message_private.h                 |  15 +-
 src/router_core/connections.c         |  36 +++
 src/router_core/forwarder.c           |  80 ++++--
 src/router_core/router_core.c         |   9 +
 src/router_core/router_core_private.h |  31 ++-
 src/router_core/transfer.c            | 324 ++++++++++++++++++++----
 src/router_node.c                     | 393 ++++++++++++++++-------------
 tests/system_tests_one_router.py      |  61 ++++-
 tests/system_tests_two_routers.py     |  61 ++++-
 14 files changed, 1133 insertions(+), 362 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/include/qpid/dispatch/buffer.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/buffer.h b/include/qpid/dispatch/buffer.h
index e913377..d4fcd15 100644
--- a/include/qpid/dispatch/buffer.h
+++ b/include/qpid/dispatch/buffer.h
@@ -27,15 +27,19 @@
  */
 
 #include <qpid/dispatch/ctools.h>
+#include <qpid/dispatch/atomic.h>
 
 typedef struct qd_buffer_t qd_buffer_t;
 
 DEQ_DECLARE(qd_buffer_t, qd_buffer_list_t);
 
+extern size_t BUFFER_SIZE;
+
 /** A raw byte buffer .*/
 struct qd_buffer_t {
     DEQ_LINKS(qd_buffer_t);
     unsigned int size;          ///< Size of data content
+    sys_atomic_t bfanout;        // The number of receivers for this buffer
 };
 
 /**
@@ -117,6 +121,23 @@ void qd_buffer_list_free_buffers(qd_buffer_list_t *list);
  */
 unsigned int qd_buffer_list_length(const qd_buffer_list_t *list);
 
+/*
+ * Increase the fanout by 1. How many receivers should this buffer be sent to.
+ */
+void qd_buffer_add_fanout(qd_buffer_t *buf);
+
+/**
+ * Return the buffer's fanout.
+ */
+size_t qd_buffer_fanout(qd_buffer_t *buf);
+
+/**
+ * Advance the buffer by len. Does not manipulate the contents of the buffer
+ * @param buf A pointer to an allocated buffer
+ * @param len The number of octets that by which the buffer should be advanced.
+ */
+unsigned char *qd_buffer_at(qd_buffer_t *buf, size_t len);
+
 
 ///@}
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/include/qpid/dispatch/message.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index d3f2bb4..111c6a0 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -201,9 +201,9 @@ int  qd_message_get_phase_annotation(const qd_message_t 
*msg);
 void qd_message_set_ingress_annotation(qd_message_t *msg, qd_composed_field_t 
*ingress_field);
 
 /**
- * Receive message data via a delivery.  This function may be called more than 
once on the same
- * delivery if the message spans multiple frames.  Once a complete message has 
been received, this
- * function shall return the message.
+ * Receive message data frame by frame via a delivery.  This function may be 
called more than once on the same
+ * delivery if the message spans multiple frames. Always returns a message. 
The message buffers are filled up to the point with the data that was been 
received so far.
+ * The buffer keeps filling up on successive calls to this function.
  *
  * @param delivery An incoming delivery from a link
  * @return A pointer to the complete message or 0 if the message is not yet 
complete.
@@ -296,6 +296,61 @@ qd_parsed_field_t *qd_message_get_trace      (qd_message_t 
*msg);
  */
 int                qd_message_get_phase_val  (qd_message_t *msg);
 
+/*
+ * Should the message be discarded.
+ * A message can be discarded if the disposition is released or rejected.
+ *
+ * @param msg A pointer to the message.
+ **/
+bool qd_message_is_discard(qd_message_t *msg);
+
+/**
+ *Set the discard field on the message to to the passed in boolean value.
+ *
+ * @param msg A pointer to the message.
+ * @param discard - the boolean value of discard.
+ */
+void qd_message_set_discard(qd_message_t *msg, bool discard);
+
+/**
+ * Has the message been completely received?
+ * Return true if the message is fully received
+ * Returns false if only the partial message has been received, if there is 
more of the message to be received.
+ *
+ * @param msg A pointer to the message.
+ */
+bool qd_message_receive_complete(qd_message_t *msg);
+
+/**
+ * Returns true if the message has been completely received AND the message 
has been completely sent.
+ */
+bool qd_message_send_complete(qd_message_t *msg);
+
+/**
+ * Returns true if the delivery tag has already been sent.
+ */
+bool qd_message_tag_sent(qd_message_t *msg);
+
+
+/**
+ * Sets if the delivery tag has already been sent out or not.
+ */
+void qd_message_set_tag_sent(qd_message_t *msg, bool tag_sent);
+
+/**
+ * Get the number of receivers for this message.
+ *
+ * @param msg A pointer to the message.
+ */
+size_t qd_message_fanout(qd_message_t *msg);
+
+/**
+ * Increase the fanout of the message by 1.
+ *
+ * @param msg A pointer to the message.
+ */
+void qd_message_add_fanout(qd_message_t *msg);
+
 ///@}
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/include/qpid/dispatch/router_core.h
----------------------------------------------------------------------
diff --git a/include/qpid/dispatch/router_core.h 
b/include/qpid/dispatch/router_core.h
index 0031ed7..18499e0 100644
--- a/include/qpid/dispatch/router_core.h
+++ b/include/qpid/dispatch/router_core.h
@@ -541,8 +541,9 @@ qdr_delivery_t *qdr_link_deliver_to(qdr_link_t *link, 
qd_message_t *msg,
 qdr_delivery_t *qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t 
*msg, bool settled,
                                                 const uint8_t *tag, int 
tag_length,
                                                 uint64_t disposition, 
pn_data_t* disposition_state);
+qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *delivery);
 
-void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int 
credit);
+int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int 
credit);
 
 void qdr_link_flow(qdr_core_t *core, qdr_link_t *link, int credit, bool 
drain_mode);
 
@@ -589,6 +590,11 @@ void qdr_delivery_tag(const qdr_delivery_t *delivery, 
const char **tag, int *len
 qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery);
 qdr_error_t *qdr_delivery_error(const qdr_delivery_t *delivery);
 void qdr_delivery_write_extension_state(qdr_delivery_t *dlv, pn_delivery_t* 
pdlv, bool update_disposition);
+bool qdr_delivery_send_complete(const qdr_delivery_t *delivery);
+bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery);
+void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent);
+bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery);
+void qdr_delivery_set_cleared_proton_ref(qdr_delivery_t *dlv, bool 
cleared_proton_ref);
 
 /**
  ******************************************************************************

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/buffer.c
----------------------------------------------------------------------
diff --git a/src/buffer.c b/src/buffer.c
index 57f30c9..14cf229 100644
--- a/src/buffer.c
+++ b/src/buffer.c
@@ -24,17 +24,18 @@
 #include <string.h>
 
 
-static size_t buffer_size = 512;
-static int    size_locked = 0;
+size_t BUFFER_SIZE                 = 512;
+
+static int size_locked = 0;
 
 ALLOC_DECLARE(qd_buffer_t);
-ALLOC_DEFINE_CONFIG(qd_buffer_t, sizeof(qd_buffer_t), &buffer_size, 0);
+ALLOC_DEFINE_CONFIG(qd_buffer_t, sizeof(qd_buffer_t), &BUFFER_SIZE, 0);
 
 
 void qd_buffer_set_size(size_t size)
 {
     assert(!size_locked);
-    buffer_size = size;
+    BUFFER_SIZE = size;
 }
 
 
@@ -44,7 +45,8 @@ qd_buffer_t *qd_buffer(void)
     qd_buffer_t *buf = new_qd_buffer_t();
 
     DEQ_ITEM_INIT(buf);
-    buf->size = 0;
+    buf->size   = 0;
+    sys_atomic_init(&buf->bfanout, 0);
     return buf;
 }
 
@@ -70,7 +72,7 @@ unsigned char *qd_buffer_cursor(qd_buffer_t *buf)
 
 size_t qd_buffer_capacity(qd_buffer_t *buf)
 {
-    return buffer_size - buf->size;
+    return BUFFER_SIZE - buf->size;
 }
 
 
@@ -83,7 +85,26 @@ size_t qd_buffer_size(qd_buffer_t *buf)
 void qd_buffer_insert(qd_buffer_t *buf, size_t len)
 {
     buf->size += len;
-    assert(buf->size <= buffer_size);
+    assert(buf->size <= BUFFER_SIZE);
+}
+
+void qd_buffer_add_fanout(qd_buffer_t *buf)
+{
+    sys_atomic_inc(&buf->bfanout);
+}
+
+size_t qd_buffer_fanout(qd_buffer_t *buf)
+{
+    return buf->bfanout;
+}
+
+
+unsigned char *qd_buffer_at(qd_buffer_t *buf, size_t len)
+{
+    // If the len is greater than the buffer size, we might point to some 
garbage.
+    // We dont want that to happen, so do the assert.
+    assert(len <= BUFFER_SIZE);
+    return ((unsigned char*) &buf[1]) + len;
 }
 
 unsigned int qd_buffer_list_clone(qd_buffer_list_t *dst, const 
qd_buffer_list_t *src)

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/message.c
----------------------------------------------------------------------
diff --git a/src/message.c b/src/message.c
index 2dbe9ee..0788194 100644
--- a/src/message.c
+++ b/src/message.c
@@ -838,6 +838,13 @@ qd_message_t *qd_message()
     DEQ_INIT(msg->ma_trace);
     DEQ_INIT(msg->ma_ingress);
     msg->ma_phase = 0;
+    msg->ma_phase      = 0;
+    msg->sent_depth    = QD_DEPTH_NONE;
+    msg->cursor.buffer = 0;
+    msg->cursor.cursor = 0;
+    msg->send_complete = false;
+    msg->tag_sent      = false;
+
     msg->content = new_qd_message_content_t();
 
     if (msg->content == 0) {
@@ -913,6 +920,12 @@ qd_message_t *qd_message_copy(qd_message_t *in_msg)
 
     copy->content = content;
 
+    copy->sent_depth    = QD_DEPTH_NONE;
+    copy->cursor.buffer = 0;
+    copy->cursor.cursor = 0;
+    copy->send_complete = false;
+    copy->tag_sent      = false;
+
     qd_message_message_annotations((qd_message_t*) copy);
 
     sys_atomic_inc(&content->ref_count);
@@ -999,11 +1012,84 @@ void qd_message_set_ingress_annotation(qd_message_t 
*in_msg, qd_composed_field_t
     qd_compose_free(ingress_field);
 }
 
+bool qd_message_is_discard(qd_message_t *msg)
+{
+    if (!msg)
+        return false;
+    qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
+    return pvt_msg->content->discard;
+}
+
+void qd_message_set_discard(qd_message_t *msg, bool discard)
+{
+    if (!msg)
+        return;
+
+    qd_message_pvt_t *pvt_msg = (qd_message_pvt_t*) msg;
+    pvt_msg->content->discard = discard;
+}
+
+size_t qd_message_fanout(qd_message_t *in_msg)
+{
+    if (!in_msg)
+        return 0;
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    return msg->content->fanout;
+}
+
+void qd_message_add_fanout(qd_message_t *in_msg)
+{
+    assert(in_msg);
+    qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+    sys_atomic_inc(&msg->content->fanout);
+}
+
+bool qd_message_receive_complete(qd_message_t *in_msg)
+{
+    if (!in_msg)
+        return false;
+    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    return msg->content->receive_complete;
+}
+
+bool qd_message_send_complete(qd_message_t *in_msg)
+{
+    if (!in_msg)
+        return false;
+
+    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    return msg->send_complete;
+}
+
+bool qd_message_tag_sent(qd_message_t *in_msg)
+{
+    if (!in_msg)
+        return false;
+
+    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    return msg->tag_sent;
+}
+
+void qd_message_set_tag_sent(qd_message_t *in_msg, bool tag_sent)
+{
+    if (!in_msg)
+        return;
+
+    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    msg->tag_sent = tag_sent;
+}
+
+qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *in_msg)
+{
+    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
+    return msg->cursor;
+}
+
 qd_message_t *qd_message_receive(pn_delivery_t *delivery)
 {
     pn_link_t        *link = pn_delivery_link(delivery);
     ssize_t           rc;
-    qd_buffer_t      *buf;
+    qd_buffer_t      *buf = 0;
 
     pn_record_t *record    = pn_delivery_attachments(delivery);
     qd_message_pvt_t *msg  = (qd_message_pvt_t*) pn_record_get(record, 
PN_DELIVERY_CTX);
@@ -1023,27 +1109,48 @@ qd_message_t *qd_message_receive(pn_delivery_t 
*delivery)
     }
 
     //
+    // The discard flag indicates if we should continue receiving the message.
+    // This is pertinent in the case of large messages. When large messages 
are being received, we try to send out part of the
+    // message that has been received so far. If we not able to send it 
anywhere, there is no need to keep creating buffers
+    //
+    bool discard = qd_message_is_discard((qd_message_t*)msg);
+
+    //
     // Get a reference to the tail buffer on the message.  This is the buffer 
into which
-    // we will store incoming message data.  If there is no buffer in the 
message, allocate
-    // an empty one and add it to the message.
+    // we will store incoming message data.  If there is no buffer in the 
message, this is the
+    // first time we are here and we need to allocate an empty one and add it 
to the message.
     //
-    buf = DEQ_TAIL(msg->content->buffers);
-    if (!buf) {
-        buf = qd_buffer();
-        DEQ_INSERT_TAIL(msg->content->buffers, buf);
+    if (!discard) {
+        buf = DEQ_TAIL(msg->content->buffers);
+        if (!buf) {
+            buf = qd_buffer();
+            DEQ_INSERT_TAIL(msg->content->buffers, buf);
+        }
     }
 
     while (1) {
-        //
-        // Try to receive enough data to fill the remaining space in the tail 
buffer.
-        //
-        rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), 
qd_buffer_capacity(buf));
+        if (discard) {
+            char dummy[BUFFER_SIZE];
+            rc = pn_link_recv(link, dummy, BUFFER_SIZE);
+        }
+        else {
+            //
+            // Try to receive enough data to fill the remaining space in the 
tail buffer.
+            //
+
+            rc = pn_link_recv(link, (char*) qd_buffer_cursor(buf), 
qd_buffer_capacity(buf));
+        }
 
         //
         // If we receive PN_EOS, we have come to the end of the message.
         //
         if (rc == PN_EOS) {
             //
+            // We have received the entire message since rc == PN_EOS, set the 
receive_complete flag to true
+            //
+            msg->content->receive_complete = true;
+
+            //
             // Clear the value in the record with key PN_DELIVERY_CTX
             //
             pn_record_set(record, PN_DELIVERY_CTX, 0);
@@ -1053,9 +1160,10 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
             // will only happen if the size of the message content is an exact 
multiple
             // of the buffer size.
             //
-
-            if (qd_buffer_size(buf) == 0) {
+            if (buf && qd_buffer_size(buf) == 0) {
+                sys_mutex_lock(msg->content->lock);
                 DEQ_REMOVE_TAIL(msg->content->buffers);
+                sys_mutex_unlock(msg->content->lock);
                 qd_buffer_free(buf);
             }
 
@@ -1063,6 +1171,8 @@ qd_message_t *qd_message_receive(pn_delivery_t *delivery)
         }
 
         if (rc > 0) {
+            if (discard)
+                continue;
             //
             // We have received a positive number of bytes for the message.  
Advance
             // the cursor in the buffer.
@@ -1073,17 +1183,21 @@ qd_message_t *qd_message_receive(pn_delivery_t 
*delivery)
             // If the buffer is full, allocate a new empty buffer and append 
it to the
             // tail of the message's list.
             //
+            sys_mutex_lock(msg->content->lock);
             if (qd_buffer_capacity(buf) == 0) {
                 buf = qd_buffer();
                 DEQ_INSERT_TAIL(msg->content->buffers, buf);
             }
+            sys_mutex_unlock(msg->content->lock);
+
         } else
             //
             // We received zero bytes, and no PN_EOS.  This means that we've 
received
             // all of the data available up to this point, but it does not 
constitute
             // the entire message.  We'll be back later to finish it up.
+            // Return the message so that the caller can start sending out 
whatever we have received so far
             //
-            break;
+            return (qd_message_t*) msg;
     }
 
     return 0;
@@ -1226,98 +1340,160 @@ void qd_message_send(qd_message_t *in_msg,
 {
     qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
     qd_message_content_t *content = msg->content;
-    qd_buffer_t          *buf     = DEQ_HEAD(content->buffers);
-    unsigned char        *cursor;
+    qd_buffer_t          *buf     = 0;
     pn_link_t            *pnl     = qd_link_pn(link);
 
-    qd_buffer_list_t new_ma;
-    qd_buffer_list_t new_ma_trailer;
-    DEQ_INIT(new_ma);
-    DEQ_INIT(new_ma_trailer);
+    int                  fanout   = qd_message_fanout(in_msg);
 
-    // Process  the message annotations if any
-    compose_message_annotations(msg, &new_ma, &new_ma_trailer, 
strip_annotations);
+    if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) {
 
-    //
-    // Send header if present
-    //
-    cursor = qd_buffer_base(buf);
-    if (content->section_message_header.length > 0) {
-        buf    = content->section_message_header.buffer;
-        cursor = content->section_message_header.offset + qd_buffer_base(buf);
-        advance(&cursor, &buf,
-                content->section_message_header.length + 
content->section_message_header.hdr_length,
-                send_handler, (void*) pnl);
-    }
+        qd_buffer_list_t new_ma;
+        qd_buffer_list_t new_ma_trailer;
+        DEQ_INIT(new_ma);
+        DEQ_INIT(new_ma_trailer);
 
-    //
-    // Send delivery annotation if present
-    //
-    if (content->section_delivery_annotation.length > 0) {
-        buf    = content->section_delivery_annotation.buffer;
-        cursor = content->section_delivery_annotation.offset + 
qd_buffer_base(buf);
-        advance(&cursor, &buf,
-                content->section_delivery_annotation.length + 
content->section_delivery_annotation.hdr_length,
-                send_handler, (void*) pnl);
-    }
+        // Process  the message annotations if any
+        compose_message_annotations(msg, &new_ma, &new_ma_trailer, 
strip_annotations);
 
-    //
-    // Send new message annotations map start if any
-    //
-    qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
-    while (da_buf) {
-        char *to_send = (char*) qd_buffer_base(da_buf);
-        pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
-        da_buf = DEQ_NEXT(da_buf);
-    }
-    qd_buffer_list_free_buffers(&new_ma);
+        //
+        // Start with the very first buffer;
+        //
+        buf = DEQ_HEAD(content->buffers);
 
-    //
-    // Annotations possibly include an opaque blob of user annotations
-    //
-    if (content->field_user_annotations.length > 0) {
-        qd_buffer_t *buf2      = content->field_user_annotations.buffer;
-        unsigned char *cursor2 = content->field_user_annotations.offset + 
qd_buffer_base(buf);
-        advance(&cursor2, &buf2,
-                content->field_user_annotations.length,
-                send_handler, (void*) pnl);
-    }
 
-    //
-    // Annotations may include the v1 new_ma_trailer
-    //
-    qd_buffer_t *ta_buf = DEQ_HEAD(new_ma_trailer);
-    while (ta_buf) {
-        char *to_send = (char*) qd_buffer_base(ta_buf);
-        pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
-        ta_buf = DEQ_NEXT(ta_buf);
-    }
-    qd_buffer_list_free_buffers(&new_ma_trailer);
+        //
+        // Send header if present
+        //
+        unsigned char *cursor = qd_buffer_base(buf);
+        int header_consume = content->section_message_header.length + 
content->section_message_header.hdr_length;
+        if (content->section_message_header.length > 0) {
+            buf    = content->section_message_header.buffer;
+            cursor = content->section_message_header.offset + 
qd_buffer_base(buf);
+            advance(&cursor, &buf, header_consume, send_handler, (void*) pnl);
+        }
 
+        //
+        // Send delivery annotation if present
+        //
+        int da_consume = content->section_delivery_annotation.length + 
content->section_delivery_annotation.hdr_length;
+        if (content->section_delivery_annotation.length > 0) {
+            buf    = content->section_delivery_annotation.buffer;
+            cursor = content->section_delivery_annotation.offset + 
qd_buffer_base(buf);
+            advance(&cursor, &buf, da_consume, send_handler, (void*) pnl);
+        }
 
-    //
-    // Skip over replaced message annotations
-    //
-    if (content->section_message_annotation.length > 0)
-        advance(&cursor, &buf,
-                content->section_message_annotation.hdr_length + 
content->section_message_annotation.length,
-                0, 0);
+        //
+        // Send new message annotations map start if any
+        //
+        qd_buffer_t *da_buf = DEQ_HEAD(new_ma);
+        while (da_buf) {
+            char *to_send = (char*) qd_buffer_base(da_buf);
+            pn_link_send(pnl, to_send, qd_buffer_size(da_buf));
+            da_buf = DEQ_NEXT(da_buf);
+        }
+        qd_buffer_list_free_buffers(&new_ma);
+
+        //
+        // Annotations possibly include an opaque blob of user annotations
+        //
+        if (content->field_user_annotations.length > 0) {
+            qd_buffer_t *buf2      = content->field_user_annotations.buffer;
+            unsigned char *cursor2 = content->field_user_annotations.offset + 
qd_buffer_base(buf);
+            advance(&cursor2, &buf2,
+                    content->field_user_annotations.length,
+                    send_handler, (void*) pnl);
+        }
+
+        //
+        // Annotations may include the v1 new_ma_trailer
+        //
+        qd_buffer_t *ta_buf = DEQ_HEAD(new_ma_trailer);
+        while (ta_buf) {
+            char *to_send = (char*) qd_buffer_base(ta_buf);
+            pn_link_send(pnl, to_send, qd_buffer_size(ta_buf));
+            ta_buf = DEQ_NEXT(ta_buf);
+        }
+        qd_buffer_list_free_buffers(&new_ma_trailer);
+
+
+        //
+        // Skip over replaced message annotations
+        //
+        int ma_consume = content->section_message_annotation.hdr_length + 
content->section_message_annotation.length;
+        if (content->section_message_annotation.length > 0)
+            advance(&cursor, &buf, ma_consume, 0, 0);
+
+
+        msg->cursor.buffer = buf;
+
+        //
+        // If this message has no header and no delivery annotations and no 
message annotations, set the offset to 0.
+        //
+        if (header_consume == 0 && da_consume == 0 && ma_consume ==0)
+            msg->cursor.cursor = qd_buffer_base(buf);
+        else
+            msg->cursor.cursor = cursor;
+
+        msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS;
 
-    //
-    // Send remaining partial buffer
-    //
-    if (buf) {
-        size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf));
-        advance(&cursor, &buf, len, send_handler, (void*) pnl);
     }
 
-    // Fall through to process the remaining buffers normally
-    // Note that 'advance' will have moved us to the next buffer in the chain.
+    buf = msg->cursor.buffer;
 
+    if (!buf)
+        return;
 
     while (buf) {
-        pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf));
-        buf = DEQ_NEXT(buf);
+        size_t buf_size = qd_buffer_size(buf);
+
+        // This will send the remaining data in the buffer if any.
+        int num_bytes_to_send = buf_size - (msg->cursor.cursor - 
qd_buffer_base(buf));
+        if (num_bytes_to_send > 0)
+            pn_link_send(pnl, (const char*)msg->cursor.cursor, 
num_bytes_to_send);
+
+        // If the entire message has already been received,  taking out this 
lock is not that expensive
+        // because there is no contention for this lock.
+        sys_mutex_lock(msg->content->lock);
+
+        qd_buffer_t *next_buf = DEQ_NEXT(buf);
+        if (next_buf) {
+            // There is a next buffer, the previous buffer has been fully sent 
by now.
+            qd_buffer_add_fanout(buf);
+            if (fanout == qd_buffer_fanout(buf)) {
+                qd_buffer_t *local_buf = DEQ_HEAD(content->buffers);
+                while (local_buf && local_buf != next_buf) {
+                    DEQ_REMOVE_HEAD(content->buffers);
+                    qd_buffer_free(local_buf);
+                    local_buf = DEQ_HEAD(content->buffers);
+                }
+            }
+            msg->cursor.buffer = next_buf;
+            msg->cursor.cursor = qd_buffer_base(next_buf);
+        }
+        else {
+            if (qd_message_receive_complete(in_msg)) {
+                //
+                // There is no next_buf and there is no more of the message 
coming, this means
+                // that we have completely sent out the message.
+                //
+                msg->send_complete = true;
+                msg->cursor.buffer = 0;
+                msg->cursor.cursor = 0;
+
+            }
+            else {
+                //
+                // There is more of the message to come, update your cursor 
pointers
+                // you will come back into this function to deliver more as 
bytes arrive
+                //
+                msg->cursor.buffer = buf;
+                msg->cursor.cursor = qd_buffer_at(buf, buf_size);
+            }
+        }
+
+        sys_mutex_unlock(msg->content->lock);
+
+        buf = next_buf;
     }
 }
 
@@ -1546,6 +1722,7 @@ void qd_message_compose_1(qd_message_t *msg, const char 
*to, qd_buffer_list_t *b
 {
     qd_composed_field_t  *field   = qd_compose(QD_PERFORMATIVE_HEADER, 0);
     qd_message_content_t *content = MSG_CONTENT(msg);
+    content->receive_complete     = true;
 
     qd_compose_start_list(field);
     qd_compose_insert_bool(field, 0);     // durable
@@ -1594,6 +1771,8 @@ void qd_message_compose_1(qd_message_t *msg, const char 
*to, qd_buffer_list_t *b
 void qd_message_compose_2(qd_message_t *msg, qd_composed_field_t *field)
 {
     qd_message_content_t *content       = MSG_CONTENT(msg);
+    content->receive_complete     = true;
+
     qd_buffer_list_t     *field_buffers = qd_compose_buffers(field);
 
     content->buffers = *field_buffers;
@@ -1604,6 +1783,7 @@ void qd_message_compose_2(qd_message_t *msg, 
qd_composed_field_t *field)
 void qd_message_compose_3(qd_message_t *msg, qd_composed_field_t *field1, 
qd_composed_field_t *field2)
 {
     qd_message_content_t *content        = MSG_CONTENT(msg);
+    content->receive_complete     = true;
     qd_buffer_list_t     *field1_buffers = qd_compose_buffers(field1);
     qd_buffer_list_t     *field2_buffers = qd_compose_buffers(field2);
 

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/message_private.h
----------------------------------------------------------------------
diff --git a/src/message_private.h b/src/message_private.h
index a3b459b..073ac7f 100644
--- a/src/message_private.h
+++ b/src/message_private.h
@@ -94,7 +94,6 @@ typedef struct {
     qd_buffer_t         *parse_buffer;
     unsigned char       *parse_cursor;
     qd_message_depth_t   parse_depth;
-
     bool                 ma_parsed;                       // have parsed 
annotations in incoming message
     qd_iterator_t       *ma_field_iter_in;                // 'message field 
iterator' for msg.FIELD_MESSAGE_ANNOTATION
 
@@ -107,11 +106,21 @@ typedef struct {
     qd_parsed_field_t   *ma_pf_to_override;
     qd_parsed_field_t   *ma_pf_trace;
     int                  ma_int_phase;
+    //qd_parsed_field_t   *parsed_message_annotations;
+
+    bool                 discard;                        // Should this 
message be discarded?
+    bool                 receive_complete;               // true if the 
message has been completely received, false otherwise
+    sys_atomic_t         fanout;                         // The number of 
receivers for this message. This number does not include in-process subscribers.
 } qd_message_content_t;
 
 typedef struct {
     DEQ_LINKS(qd_message_t);   // Deque linkage that overlays the qd_message_t
-    qd_message_content_t *content;
+    qd_iterator_pointer_t cursor;          // A pointer to the current 
location of the outgoing byte stream.
+    qd_message_depth_t    message_depth;   // What is the depth of the message 
that has been received so far
+    qd_message_depth_t    sent_depth;      // How much of the message has been 
sent?  QD_DEPTH_NONE means nothing has been sent so far, QD_DEPTH_HEADER means 
the header has already been sent, dont send it again and so on.
+    bool                  send_complete;   // Has the message been completely 
received and completely sent?
+    bool                  tag_sent;        // Tags are sent
+    qd_message_content_t *content;         // The actual content of the 
message. The content is never copied
     qd_buffer_list_t      ma_to_override;  // to field in outgoing message 
annotations.
     qd_buffer_list_t      ma_trace;        // trace list in outgoing message 
annotations
     qd_buffer_list_t      ma_ingress;      // ingress field in outgoing 
message annotations
@@ -127,6 +136,8 @@ ALLOC_DECLARE(qd_message_content_t);
 /** Initialize logging */
 void qd_message_initialize();
 
+qd_iterator_pointer_t qd_message_cursor(qd_message_pvt_t *msg);
+
 ///@}
 
 #endif

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/connections.c
----------------------------------------------------------------------
diff --git a/src/router_core/connections.c b/src/router_core/connections.c
index f9638bf..264068d 100644
--- a/src/router_core/connections.c
+++ b/src/router_core/connections.c
@@ -238,6 +238,7 @@ int qdr_connection_process(qdr_connection_t *conn)
         if (ref) {
             link = ref->link;
             qdr_del_link_ref(&conn->links_with_work, ref->link, 
QDR_LINK_LIST_CLASS_WORK);
+
             link_work = DEQ_HEAD(link->work_list);
             if (link_work) {
                 DEQ_REMOVE_HEAD(link->work_list);
@@ -611,11 +612,13 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, 
qdr_connection_t *conn, qdr_li
     qdr_delivery_ref_list_t updated_deliveries;
     qdr_delivery_list_t     undelivered;
     qdr_delivery_list_t     unsettled;
+    qdr_delivery_list_t     settled;
     qdr_link_work_list_t    work_list;
 
     sys_mutex_lock(conn->work_lock);
     DEQ_MOVE(link->work_list, work_list);
     DEQ_MOVE(link->updated_deliveries, updated_deliveries);
+
     DEQ_MOVE(link->undelivered, undelivered);
     qdr_delivery_t *d = DEQ_HEAD(undelivered);
     while (d) {
@@ -631,6 +634,14 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, 
qdr_connection_t *conn, qdr_li
         d->where = QDR_DELIVERY_NOWHERE;
         d = DEQ_NEXT(d);
     }
+
+    DEQ_MOVE(link->settled, settled);
+    d = DEQ_HEAD(settled);
+    while (d) {
+        assert(d->where == QDR_DELIVERY_IN_SETTLED);
+        d->where = QDR_DELIVERY_NOWHERE;
+        d = DEQ_NEXT(d);
+    }
     sys_mutex_unlock(conn->work_lock);
 
     //
@@ -736,6 +747,31 @@ static void qdr_link_cleanup_CT(qdr_core_t *core, 
qdr_connection_t *conn, qdr_li
         dlv = DEQ_HEAD(unsettled);
     }
 
+    //Free/unlink/decref the settled deliveries.
+    dlv = DEQ_HEAD(settled);
+    while (dlv) {
+        DEQ_REMOVE_HEAD(settled);
+        peer = qdr_delivery_first_peer_CT(dlv);
+        qdr_delivery_t *next_peer = 0;
+        while (peer) {
+            next_peer = qdr_delivery_next_peer_CT(dlv);
+            qdr_delivery_unlink_peers_CT(core, dlv, peer);
+            peer = next_peer;
+        }
+
+        //
+        // Account for the lost reference from the Proton delivery
+        //
+        if (!dlv->cleared_proton_ref) {
+            dlv->cleared_proton_ref = true;
+            qdr_delivery_decref_CT(core, dlv);
+        }
+
+        // This decref is for the removing the delivery from the settled list
+        qdr_delivery_decref_CT(core, dlv);
+        dlv = DEQ_HEAD(settled);
+    }
+
     //
     // Remove the reference to this link in the connection's reference lists
     //

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/forwarder.c
----------------------------------------------------------------------
diff --git a/src/router_core/forwarder.c b/src/router_core/forwarder.c
index e31a97a..232868d 100644
--- a/src/router_core/forwarder.c
+++ b/src/router_core/forwarder.c
@@ -115,9 +115,15 @@ qdr_delivery_t *qdr_forward_new_delivery_CT(qdr_core_t 
*core, qdr_delivery_t *in
     out_dlv->error      = 0;
 
     //
-    // Create peer linkage only if the outgoing delivery is not settled
+    // Add one to the message fanout. This will later be used in the 
qd_message_send function that sends out messages.
     //
-    if (!out_dlv->settled)
+    qd_message_add_fanout(msg);
+
+    //
+    // Create peer linkage if the outgoing delivery is unsettled. This peer 
linkage is necessary to deal with dispositions that show up in the future.
+    // Also create peer linkage if the message is not yet been completely 
received. This linkage will help us stream large pre-settled multicast messages.
+    //
+    if (!out_dlv->settled || !qd_message_receive_complete(msg))
         qdr_delivery_link_peers_CT(in_dlv, out_dlv);
 
     return out_dlv;
@@ -163,28 +169,30 @@ static void qdr_forward_drop_presettled_CT_LH(qdr_core_t 
*core, qdr_link_t *link
 }
 
 
-void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *link, qdr_delivery_t 
*dlv)
+void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t *out_link, 
qdr_delivery_t *out_dlv)
 {
-    sys_mutex_lock(link->conn->work_lock);
+    sys_mutex_lock(out_link->conn->work_lock);
 
     //
     // If the delivery is pre-settled and the outbound link is at or above 
capacity,
     // discard all pre-settled deliveries on the undelivered list prior to 
enqueuing
     // the new delivery.
     //
-    if (dlv->settled && link->capacity > 0 && DEQ_SIZE(link->undelivered) >= 
link->capacity)
-        qdr_forward_drop_presettled_CT_LH(core, link);
+    if (out_dlv->settled && out_link->capacity > 0 && 
DEQ_SIZE(out_link->undelivered) >= out_link->capacity)
+        qdr_forward_drop_presettled_CT_LH(core, out_link);
 
-    DEQ_INSERT_TAIL(link->undelivered, dlv);
-    dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
-    qdr_delivery_incref(dlv);
+    DEQ_INSERT_TAIL(out_link->undelivered, out_dlv);
+    out_dlv->where = QDR_DELIVERY_IN_UNDELIVERED;
+
+    // This incref is for putting the delivery in the undelivered list
+    qdr_delivery_incref(out_dlv);
 
     //
     // We must put a work item on the link's work list to represent this 
pending delivery.
     // If there's already a delivery item on the tail of the work list, simply 
join that item
     // by incrementing the value.
     //
-    qdr_link_work_t *work = DEQ_TAIL(link->work_list);
+    qdr_link_work_t *work = DEQ_TAIL(out_link->work_list);
     if (work && work->work_type == QDR_LINK_WORK_DELIVERY) {
         work->value++;
     } else {
@@ -192,16 +200,16 @@ void qdr_forward_deliver_CT(qdr_core_t *core, qdr_link_t 
*link, qdr_delivery_t *
         ZERO(work);
         work->work_type = QDR_LINK_WORK_DELIVERY;
         work->value     = 1;
-        DEQ_INSERT_TAIL(link->work_list, work);
-        qdr_add_link_ref(&link->conn->links_with_work, link, 
QDR_LINK_LIST_CLASS_WORK);
+        DEQ_INSERT_TAIL(out_link->work_list, work);
     }
-    dlv->link_work = work;
-    sys_mutex_unlock(link->conn->work_lock);
+    qdr_add_link_ref(&out_link->conn->links_with_work, out_link, 
QDR_LINK_LIST_CLASS_WORK);
+    out_dlv->link_work = work;
+    sys_mutex_unlock(out_link->conn->work_lock);
 
     //
     // Activate the outgoing connection for later processing.
     //
-    qdr_connection_activate_CT(core, link->conn);
+    qdr_connection_activate_CT(core, out_link->conn);
 }
 
 
@@ -235,6 +243,7 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     int           fanout               = 0;
     qd_bitmask_t *link_exclusion       = !!in_delivery ? 
in_delivery->link_exclusion : 0;
     bool          presettled           = !!in_delivery ? in_delivery->settled 
: true;
+    bool          receive_complete     = 
qd_message_receive_complete(qdr_delivery_message(in_delivery));
 
     //
     // If the delivery is not presettled, set the settled flag for forwarding 
so all
@@ -245,7 +254,6 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
     //
     if (!presettled) {
         in_delivery->settled = true;
-
         //
         // If the router is configured to reject unsettled multicasts, settle 
and reject this delivery.
         //
@@ -351,9 +359,23 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         //
         // Forward to in-process subscribers
         //
+
         qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
         while (sub) {
-            qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
+            //
+            // Only if the message has been completely received, forward it to 
the subscription
+            // Subscriptions, at the moment, dont have the ability to deal 
with partial messages
+            //
+            if (receive_complete)
+                qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
+            else
+                //
+                // Receive is not complete, we will store the sub in 
in_delivery->subscriptions so we can send the message to the subscription
+                // after the message fully arrives
+                //
+                DEQ_INSERT_TAIL(in_delivery->subscriptions, sub);
+
+
             fanout++;
             addr->deliveries_to_container++;
             sub = DEQ_NEXT(sub);
@@ -370,10 +392,13 @@ int qdr_forward_multicast_CT(qdr_core_t      *core,
         else {
             //
             // The delivery was not presettled and it was forwarded to at least
-            // one destination.  Accept and settle the delivery.
+            // one destination.  Accept and settle the delivery only if the 
entire delivery
+            // has been received.
             //
-            in_delivery->disposition = PN_ACCEPTED;
-            qdr_delivery_push_CT(core, in_delivery);
+            if (receive_complete) {
+                in_delivery->disposition = PN_ACCEPTED;
+                qdr_delivery_push_CT(core, in_delivery);
+            }
         }
     }
 
@@ -395,9 +420,22 @@ int qdr_forward_closest_CT(qdr_core_t      *core,
     // Forward to an in-process subscriber if there is one.
     //
     if (!exclude_inprocess) {
+        bool receive_complete = qd_message_receive_complete(msg);
         qdr_subscription_t *sub = DEQ_HEAD(addr->subscriptions);
         if (sub) {
-            qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
+
+            //
+            // Only if the message has been completely received, forward it.
+            // Subscriptions, at the moment, dont have the ability to deal 
with partial messages
+            //
+            if (receive_complete)
+                qdr_forward_on_message_CT(core, sub, in_delivery ? 
in_delivery->link : 0, msg);
+            else
+                //
+                // Receive is not complete, we will store the sub in 
in_delivery->subscriptions so we can send the message to the subscription
+                // after the message fully arrives
+                //
+                DEQ_INSERT_TAIL(in_delivery->subscriptions, sub);
 
             //
             // If the incoming delivery is not settled, it should be accepted 
and settled here.

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/router_core.c
----------------------------------------------------------------------
diff --git a/src/router_core/router_core.c b/src/router_core/router_core.c
index 510b427..010ad98 100644
--- a/src/router_core/router_core.c
+++ b/src/router_core/router_core.c
@@ -308,6 +308,15 @@ qdr_address_t *qdr_add_local_address_CT(qdr_core_t *core, 
char aclass, const cha
     return addr;
 }
 
+bool qdr_is_addr_treatment_multicast(qdr_address_t *addr)
+{
+    if (addr) {
+        if (addr->treatment == QD_TREATMENT_MULTICAST_FLOOD || addr->treatment 
== QD_TREATMENT_MULTICAST_ONCE)
+            return true;
+    }
+    return false;
+}
+
 void qdr_core_remove_address(qdr_core_t *core, qdr_address_t *addr)
 {
     // Remove the address from the list and hash index

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/router_core_private.h
----------------------------------------------------------------------
diff --git a/src/router_core/router_core_private.h 
b/src/router_core/router_core_private.h
index da36b14..052ed7f 100644
--- a/src/router_core/router_core_private.h
+++ b/src/router_core/router_core_private.h
@@ -300,7 +300,8 @@ DEQ_DECLARE(qdr_router_ref_t, qdr_router_ref_list_t);
 typedef enum {
     QDR_DELIVERY_NOWHERE = 0,
     QDR_DELIVERY_IN_UNDELIVERED,
-    QDR_DELIVERY_IN_UNSETTLED
+    QDR_DELIVERY_IN_UNSETTLED,
+    QDR_DELIVERY_IN_SETTLED
 } qdr_delivery_where_t;
 
 typedef struct qdr_delivery_ref_t {
@@ -311,6 +312,17 @@ typedef struct qdr_delivery_ref_t {
 ALLOC_DECLARE(qdr_delivery_ref_t);
 DEQ_DECLARE(qdr_delivery_ref_t, qdr_delivery_ref_list_t);
 
+struct qdr_subscription_t {
+    DEQ_LINKS(qdr_subscription_t);
+    qdr_core_t    *core;
+    qdr_address_t *addr;
+    qdr_receive_t  on_message;
+    void          *on_message_context;
+};
+
+DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t);
+
+
 struct qdr_delivery_t {
     DEQ_LINKS(qdr_delivery_t);
     void                   *context;
@@ -334,6 +346,7 @@ struct qdr_delivery_t {
     qdr_address_t          *tracking_addr;
     int                     tracking_addr_bit;
     qdr_link_work_t        *link_work;         ///< Delivery work item for 
this delivery
+    qdr_subscription_list_t subscriptions;
     qdr_delivery_ref_list_t peers;             /// Use this list if there if 
the delivery has more than one peer.
 
 };
@@ -375,6 +388,7 @@ struct qdr_link_t {
     qdr_auto_link_t         *auto_link;          ///< [ref] Auto_link that 
owns this link
     qdr_delivery_list_t      undelivered;        ///< Deliveries to be 
forwarded or sent
     qdr_delivery_list_t      unsettled;          ///< Unsettled deliveries
+    qdr_delivery_list_t      settled;            ///< Settled deliveries
     qdr_delivery_ref_list_t  updated_deliveries; ///< References to deliveries 
(in the unsettled list) with updates.
     bool                     admin_enabled;
     qdr_link_oper_status_t   oper_status;
@@ -419,18 +433,6 @@ DEQ_DECLARE(qdr_connection_ref_t, 
qdr_connection_ref_list_t);
 void qdr_add_connection_ref(qdr_connection_ref_list_t *ref_list, 
qdr_connection_t *conn);
 void qdr_del_connection_ref(qdr_connection_ref_list_t *ref_list, 
qdr_connection_t *conn);
 
-
-struct qdr_subscription_t {
-    DEQ_LINKS(qdr_subscription_t);
-    qdr_core_t    *core;
-    qdr_address_t *addr;
-    qdr_receive_t  on_message;
-    void          *on_message_context;
-};
-
-DEQ_DECLARE(qdr_subscription_t, qdr_subscription_list_t);
-
-
 struct qdr_address_t {
     DEQ_LINKS(qdr_address_t);
     qdr_subscription_list_t    subscriptions; ///< In-process message 
subscribers
@@ -489,7 +491,7 @@ struct qdr_address_config_t {
 ALLOC_DECLARE(qdr_address_config_t);
 DEQ_DECLARE(qdr_address_config_t, qdr_address_config_list_t);
 void qdr_core_remove_address_config(qdr_core_t *core, qdr_address_config_t 
*addr);
-
+bool qdr_is_addr_treatment_multicast(qdr_address_t *addr);
 
 //
 // Connection Information
@@ -699,6 +701,7 @@ void qdr_delivery_release_CT(qdr_core_t *core, 
qdr_delivery_t *delivery);
 void qdr_delivery_failed_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 bool qdr_delivery_settled_CT(qdr_core_t *core, qdr_delivery_t *delivery);
 void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *delivery);
+void qdr_forward_on_message_CT(qdr_core_t *core, qdr_subscription_t *sub, 
qdr_link_t *link, qd_message_t *msg);
 
 /**
  * Links the in_dlv to the out_dlv and increments ref counts of both deliveries

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_core/transfer.c
----------------------------------------------------------------------
diff --git a/src/router_core/transfer.c b/src/router_core/transfer.c
index 9453996..4198262 100644
--- a/src/router_core/transfer.c
+++ b/src/router_core/transfer.c
@@ -26,6 +26,7 @@ static void qdr_link_flow_CT(qdr_core_t *core, qdr_action_t 
*action, bool discar
 static void qdr_send_to_CT(qdr_core_t *core, qdr_action_t *action, bool 
discard);
 static void qdr_update_delivery_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 static void qdr_delete_delivery_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
+static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard);
 
 
//==================================================================================
 // Internal Functions
@@ -114,40 +115,82 @@ qdr_delivery_t 
*qdr_link_deliver_to_routed_link(qdr_link_t *link, qd_message_t *
 }
 
 
-void qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int 
credit)
+qdr_delivery_t *qdr_deliver_continue(qdr_delivery_t *in_dlv)
+{
+    qdr_action_t   *action = qdr_action(qdr_deliver_continue_CT, 
"deliver_continue");
+    action->args.connection.delivery = in_dlv;
+
+    // This incref is for the action reference
+    qdr_delivery_incref(in_dlv);
+    qdr_action_enqueue(in_dlv->link->core, action);
+    return in_dlv;
+}
+
+
+int qdr_link_process_deliveries(qdr_core_t *core, qdr_link_t *link, int credit)
 {
     qdr_connection_t *conn = link->conn;
     qdr_delivery_t   *dlv;
     bool              drained = false;
     int               offer   = -1;
     bool              settled = false;
+    bool              send_complete = false;
+    int               num_deliveries_completed = 0;
 
     if (link->link_direction == QD_OUTGOING) {
         while (credit > 0 && !drained) {
             sys_mutex_lock(conn->work_lock);
             dlv = DEQ_HEAD(link->undelivered);
-            if (dlv) {
-                DEQ_REMOVE_HEAD(link->undelivered);
-                dlv->link_work = 0;
-                settled = dlv->settled;
-                if (!settled) {
-                    DEQ_INSERT_TAIL(link->unsettled, dlv);
-                    dlv->where = QDR_DELIVERY_IN_UNSETTLED;
-                } else
-                    dlv->where = QDR_DELIVERY_NOWHERE;
-
-                credit--;
-                link->total_deliveries++;
-                offer = DEQ_SIZE(link->undelivered);
-            } else
-                drained = true;
             sys_mutex_unlock(conn->work_lock);
-
             if (dlv) {
-                link->credit_to_core--;
+                settled = dlv->settled;
                 core->deliver_handler(core->user_context, link, dlv, settled);
-                if (settled)
-                    qdr_delivery_decref(core, dlv);
+                sys_mutex_lock(conn->work_lock);
+                send_complete = qdr_delivery_send_complete(dlv);
+                if (send_complete) {
+                    //
+                    // The entire message has been sent. It is now the 
appropriate time to have the delivery removed
+                    // from the head of the undelivered list and move it to 
the unsettled list if it is not settled.
+                    //
+                    DEQ_REMOVE_HEAD(link->undelivered);
+                    num_deliveries_completed ++;
+                    dlv->link_work = 0;
+
+                    if (settled) {
+                        dlv->where = QDR_DELIVERY_NOWHERE;
+
+                        // This decref is for removing this settled delivery 
from the undelivered list
+                        qdr_delivery_decref(core, dlv);
+
+                    } else {
+                        DEQ_INSERT_TAIL(link->unsettled, dlv);
+                        dlv->where = QDR_DELIVERY_IN_UNSETTLED;
+                    }
+
+                    credit--;
+                    link->credit_to_core--;
+                    link->total_deliveries++;
+                    offer = DEQ_SIZE(link->undelivered);
+                }
+                else {
+                    //
+                    // The message is still being received/sent.
+                    // 1. We cannot remove the delivery from the undelivered 
list.
+                    // This delivery needs to stay at the head of the 
undelivered list until the entire message has been sent out i.e other 
deliveries in the
+                    // undelivered list have to wait before this entire large 
delivery is sent out
+                    // 2. We need to call deliver_handler so any newly arrived 
bytes can be pushed out
+                    // 3. We need to break out of this loop otherwise a thread 
will keep spinning in here until the entire message has been sent out.
+                    //
+                    sys_mutex_unlock(conn->work_lock);
+
+                    //
+                    // Note here that we are not incrementing 
num_deliveries_processed. Since this delivery is still coming in or still being 
sent out,
+                    // we cannot consider this delivery as fully processed.
+                    //
+                    return num_deliveries_completed;
+                }
+                sys_mutex_unlock(conn->work_lock);
+
             }
         }
 
@@ -156,6 +199,8 @@ void qdr_link_process_deliveries(qdr_core_t *core, 
qdr_link_t *link, int credit)
         else if (offer != -1)
             core->offer_handler(core->user_context, link, offer);
     }
+
+    return num_deliveries_completed;
 }
 
 
@@ -234,12 +279,49 @@ void qdr_delivery_set_context(qdr_delivery_t *delivery, 
void *context)
     delivery->context = context;
 }
 
+void qdr_delivery_set_cleared_proton_ref(qdr_delivery_t *dlv, bool 
cleared_proton_ref)
+{
+    dlv->cleared_proton_ref = cleared_proton_ref;
+}
+
 
 void *qdr_delivery_get_context(qdr_delivery_t *delivery)
 {
     return delivery->context;
 }
 
+
+bool qdr_delivery_send_complete(const qdr_delivery_t *delivery)
+{
+    if (!delivery)
+        return false;
+    return qd_message_send_complete(delivery->msg);
+}
+
+bool qdr_delivery_tag_sent(const qdr_delivery_t *delivery)
+{
+    if (!delivery)
+        return false;
+    return qd_message_tag_sent(delivery->msg);
+}
+
+void qdr_delivery_set_tag_sent(const qdr_delivery_t *delivery, bool tag_sent)
+{
+    if (!delivery)
+        return;
+
+    qd_message_set_tag_sent(delivery->msg, tag_sent);
+}
+
+
+bool qdr_delivery_receive_complete(const qdr_delivery_t *delivery)
+{
+    if (!delivery)
+        return false;
+    return qd_message_receive_complete(delivery->msg);
+}
+
+
 void qdr_delivery_incref(qdr_delivery_t *delivery)
 {
     sys_atomic_inc(&delivery->ref_count);
@@ -272,6 +354,8 @@ void qdr_delivery_tag(const qdr_delivery_t *delivery, const 
char **tag, int *len
 
 qd_message_t *qdr_delivery_message(const qdr_delivery_t *delivery)
 {
+    if (!delivery)
+        return 0;
     return delivery->msg;
 }
 
@@ -417,9 +501,19 @@ static void qdr_delete_delivery_internal_CT(qdr_core_t 
*core, qdr_delivery_t *de
             link->modified_deliveries++;
     }
 
+    //
+    // Free all the peer qdr_delivery_ref_t references
+    //
+    qdr_delivery_ref_t *ref = DEQ_HEAD(delivery->peers);
+    while (ref) {
+        qdr_del_delivery_ref(&delivery->peers, ref);
+        ref = DEQ_HEAD(delivery->peers);
+    }
+
     qd_bitmask_free(delivery->link_exclusion);
     qdr_error_free(delivery->error);
     free_qdr_delivery_t(delivery);
+
 }
 
 static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv)
@@ -427,7 +521,6 @@ static bool qdr_delivery_has_peer_CT(qdr_delivery_t *dlv)
     return dlv->peer || DEQ_SIZE(dlv->peers) > 0;
 }
 
-
 void qdr_delivery_link_peers_CT(qdr_delivery_t *in_dlv, qdr_delivery_t 
*out_dlv)
 {
     // If there is no delivery or a peer, we cannot link each other.
@@ -466,17 +559,38 @@ void qdr_delivery_unlink_peers_CT(qdr_core_t *core, 
qdr_delivery_t *dlv, qdr_del
     // If there is no delivery or a peer, we cannot proceed.
     if (!dlv || !peer)
         return;
-    //
-    // Make sure that the passed in deliveries are indeed peers.
-    //
-    assert(dlv->peer == peer);
-    assert(peer->peer == dlv);
-
-    dlv->peer  = 0;
-    peer->peer = 0;
 
-    qdr_delivery_decref_CT(core, dlv);
-    qdr_delivery_decref_CT(core, peer);
+    if (dlv->peer) {
+        //
+        // This is the easy case. One delivery has only one peer. we can simply
+        // zero them out and directly decref.
+        //
+        assert(dlv->peer == peer);
+        dlv->peer  = 0;
+        peer->peer = 0;
+        qdr_delivery_decref_CT(core, dlv);
+        qdr_delivery_decref_CT(core, peer);
+    }
+    else {
+        //
+        // The dlv has more than one peer. We are going to find the peer of 
dlv that match with the passed in peer
+        // and delete that peer.
+        //
+        qdr_delivery_ref_t *dlv_ref = DEQ_HEAD(dlv->peers);
+        while (dlv_ref) {
+            qdr_delivery_t * peer_dlv = dlv_ref->dlv;
+            if (peer_dlv == peer) {
+                if (peer->peer)  {
+                    peer->peer = 0;
+                    qdr_delivery_decref_CT(core, dlv);
+                }
+                qdr_del_delivery_ref(&dlv->peers, dlv_ref);
+                qdr_delivery_decref_CT(core, peer);
+                break;
+            }
+            dlv_ref = DEQ_NEXT(dlv_ref);
+        }
+    }
 }
 
 
@@ -524,6 +638,7 @@ qdr_delivery_t *qdr_delivery_next_peer_CT(qdr_delivery_t 
*dlv)
 void qdr_delivery_decref_CT(qdr_core_t *core, qdr_delivery_t *dlv)
 {
     uint32_t ref_count = sys_atomic_dec(&dlv->ref_count);
+
     assert(ref_count > 0);
 
     if (ref_count == 1)
@@ -610,6 +725,7 @@ static long qdr_addr_path_count_CT(qdr_address_t *addr)
 
 static void qdr_link_forward_CT(qdr_core_t *core, qdr_link_t *link, 
qdr_delivery_t *dlv, qdr_address_t *addr)
 {
+    bool receive_complete = 
qd_message_receive_complete(qdr_delivery_message(dlv));
     if (addr && addr == link->owning_addr && qdr_addr_path_count_CT(addr) == 
0) {
         //
         // We are trying to forward a delivery on an address that has no 
outbound paths
@@ -658,23 +774,48 @@ static void qdr_link_forward_CT(qdr_core_t *core, 
qdr_link_t *link, qdr_delivery
         //
         // If the delivery is not settled, release it.
         //
-        if (!dlv->settled)
+        if (!dlv->settled) {
             qdr_delivery_release_CT(core, dlv);
+
+            //
+            // Set the discard flag on the message only if the message is not 
completely received yet.
+            //
+            if (!receive_complete)
+                qd_message_set_discard(dlv->msg, true);
+        }
+
+        //
+        // Decrementing the delivery ref count for the action
+        //
         qdr_delivery_decref_CT(core, dlv);
         qdr_link_issue_credit_CT(core, link, 1, false);
     } else if (fanout > 0) {
-        if (dlv->settled) {
+        if (dlv->settled || qdr_is_addr_treatment_multicast(addr)) {
             //
             // The delivery is settled.  Keep it off the unsettled list and 
issue
             // replacement credit for it now.
             //
             qdr_link_issue_credit_CT(core, link, 1, false);
+            if (receive_complete) {
+                //
+                // This decref is for the action ref
+                //
+                qdr_delivery_decref_CT(core, dlv);
+            }
+            else {
+                //
+                // The message is still coming through since receive_complete 
is false. We have to put this delivery in the settled list.
+                // We need to do this because we have linked this delivery to 
a peer.
+                // If this connection goes down, we will have to unlink peer 
so that peer knows that its peer is not-existent anymore
+                // and need to tell the other side that the message has been 
aborted.
+                //
 
-            //
-            // If the delivery has no more references, free it now.
-            //
-            assert(!dlv->peer);
-            qdr_delivery_decref_CT(core, dlv);
+                //
+                // Again, don't bother decrementing then incrementing the 
ref_count, we are still using the action ref count
+                //
+                DEQ_INSERT_TAIL(link->settled, dlv);
+                dlv->where = QDR_DELIVERY_IN_SETTLED;
+            }
         } else {
             //
             // Again, don't bother decrementing then incrementing the ref_count
@@ -703,13 +844,15 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
     qdr_delivery_t *dlv  = action->args.connection.delivery;
     qdr_link_t     *link = dlv->link;
 
-    //
-    // If this is an attach-routed link, put the delivery directly onto the 
peer link
-    //
+
     if (link->connected_link) {
+        //
+        // If this is an attach-routed link, put the delivery directly onto 
the peer link
+        //
         qdr_delivery_t *peer = qdr_forward_new_delivery_CT(core, dlv, 
link->connected_link, dlv->msg);
 
         qdr_delivery_copy_extension_state(dlv, peer, true);
+
         //
         // Copy the delivery tag.  For link-routing, the delivery tag must be 
preserved.
         //
@@ -717,9 +860,9 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
         memcpy(peer->tag, action->args.connection.tag, peer->tag_length);
 
         qdr_forward_deliver_CT(core, link->connected_link, peer);
-        qd_message_free(dlv->msg);
-        dlv->msg = 0;
+
         link->total_deliveries++;
+
         if (!dlv->settled) {
             DEQ_INSERT_TAIL(link->unsettled, dlv);
             dlv->where = QDR_DELIVERY_IN_UNSETTLED;
@@ -754,7 +897,7 @@ static void qdr_link_deliver_CT(qdr_core_t *core, 
qdr_action_t *action, bool dis
         }
 
         //
-        // Give the action reference to the qdr_link_forward function.
+        // Give the action reference to the qdr_link_forward function. Don't 
decref/incref.
         //
         qdr_link_forward_CT(core, link, dlv, addr);
     } else {
@@ -869,6 +1012,99 @@ static void qdr_delete_delivery_CT(qdr_core_t *core, 
qdr_action_t *action, bool
 }
 
 
+static void qdr_deliver_continue_peers_CT(qdr_core_t *core, qdr_delivery_t 
*in_dlv)
+{
+    qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
+    while (peer) {
+        qdr_link_work_t *work = peer->link_work;
+        //
+        // Determines if the peer connection can be activated.
+        // For a large message, the peer delivery's link_work MUST be at the 
head of the peer link's work list. This link work is only removed
+        // after the streaming message has been sent.
+        //
+        if (work) {
+            sys_mutex_lock(peer->link->conn->work_lock);
+            if (work == DEQ_HEAD(peer->link->work_list)) {
+                qdr_add_link_ref(&peer->link->conn->links_with_work, 
peer->link, QDR_LINK_LIST_CLASS_WORK);
+                sys_mutex_unlock(peer->link->conn->work_lock);
+
+                //
+                // Activate the outgoing connection for later processing.
+                //
+                qdr_connection_activate_CT(core, peer->link->conn);
+            }
+            else {
+                sys_mutex_unlock(peer->link->conn->work_lock);
+
+            }
+        }
+
+        peer = qdr_delivery_next_peer_CT(in_dlv);
+    }
+}
+
+
+static void qdr_deliver_continue_CT(qdr_core_t *core, qdr_action_t *action, 
bool discard)
+{
+    if (discard)
+        return;
+
+    qdr_delivery_t *in_dlv  = action->args.connection.delivery;
+
+    // This decref is for the action reference
+    qdr_delivery_decref_CT(core, in_dlv);
+
+    //
+    // If it is already in the undelivered list or it has no peers, don't try 
to deliver this again.
+    //
+    if (in_dlv->where == QDR_DELIVERY_IN_UNDELIVERED || 
!qdr_delivery_has_peer_CT(in_dlv))
+        return;
+
+    qdr_deliver_continue_peers_CT(core, in_dlv);
+
+
+    if (qd_message_receive_complete(qdr_delivery_message(in_dlv))) {
+        //
+        // The entire message has now been received. Check to see if there are 
in process subscriptions that need to
+        // receive this message. in process subscriptions, at this time, can 
deal only with full messages.
+        //
+        qdr_subscription_t *sub = DEQ_HEAD(in_dlv->subscriptions);
+        while (sub) {
+            DEQ_REMOVE_HEAD(in_dlv->subscriptions);
+            qdr_forward_on_message_CT(core, sub, in_dlv ? in_dlv->link : 0, 
in_dlv->msg);
+            sub = DEQ_HEAD(in_dlv->subscriptions);
+        }
+
+        // This is a multicast delivery
+        if (qdr_is_addr_treatment_multicast(in_dlv->link->owning_addr)) {
+            assert(in_dlv->where == QDR_DELIVERY_IN_SETTLED);
+            //
+            // The router will settle on behalf of the receiver in the case of 
multicast and send out settled
+            // deliveries to the receivers.
+            //
+            in_dlv->disposition = PN_ACCEPTED;
+            qdr_delivery_push_CT(core, in_dlv);
+
+            //
+            // The in_dlv has one or more peers. These peers will have to be 
unlinked.
+            //
+            qdr_delivery_t *peer = qdr_delivery_first_peer_CT(in_dlv);
+            qdr_delivery_t *next_peer = 0;
+            while (peer) {
+                next_peer = qdr_delivery_next_peer_CT(in_dlv);
+                qdr_delivery_unlink_peers_CT(core, in_dlv, peer);
+                peer = next_peer;
+            }
+
+            // Remove the delivery from the settled list and decref the in_dlv.
+            in_dlv->where = QDR_DELIVERY_NOWHERE;
+            qdr_delivery_decref_CT(core, in_dlv); // This decref is for 
removing the delivery from the settled list.
+            DEQ_REMOVE(in_dlv->link->settled, in_dlv);
+        }
+    }
+}
+
+
 /**
  * Add link-work to provide credit to the link in an IO thread
  */

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/src/router_node.c
----------------------------------------------------------------------
diff --git a/src/router_node.c b/src/router_node.c
index dd21fec..8c964e0 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -186,51 +186,47 @@ static qd_iterator_t *router_annotate_message(qd_router_t 
      *router,
  */
 static void AMQP_rx_handler(void* context, qd_link_t *link, pn_delivery_t *pnd)
 {
-    qd_router_t    *router   = (qd_router_t*) context;
-    pn_link_t      *pn_link  = qd_link_pn(link);
-    qdr_link_t     *rlink    = (qdr_link_t*) qd_link_get_context(link);
-    qd_connection_t  *conn   = qd_link_connection(link);
+    qd_router_t    *router       = (qd_router_t*) context;
+    pn_link_t      *pn_link      = qd_link_pn(link);
+    qdr_link_t     *rlink        = (qdr_link_t*) qd_link_get_context(link);
+    qd_connection_t  *conn       = qd_link_connection(link);
     const qd_server_config_t *cf = qd_connection_config(conn);
-    qdr_delivery_t *delivery = 0;
-    qd_message_t   *msg;
+    qdr_delivery_t *delivery     = pn_delivery_get_context(pnd);
 
     //
-    // Receive the message into a local representation.  If the returned 
message
-    // pointer is NULL, we have not yet received a complete message.
-    //
-    // Note:  In the link-routing case, consider cutting the message through.  
There's
-    //        no reason to wait for the whole message to be received before 
starting to
-    //        send it.
+    // Receive the message into a local representation.
     //
-    msg = qd_message_receive(pnd);
-
-    if (!msg)
-        return;
+    qd_message_t   *msg   = qd_message_receive(pnd);
+    bool receive_complete = qd_message_receive_complete(msg);
 
-    if (cf->log_message) {
-        char repr[qd_message_repr_len()];
-        char* message_repr = qd_message_repr((qd_message_t*)msg,
-                                             repr,
-                                             sizeof(repr),
-                                             cf->log_bits);
-        if (message_repr) {
-            qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link %s received 
%s",
-                   pn_link_name(pn_link),
-                   message_repr);
+    if (receive_complete) {
+        //
+        // The entire message has been received and we are ready to consume 
the delivery by calling pn_link_advance().
+        //
+        pn_link_advance(pn_link);
+
+        // Since the entire message has been received, we can print out its 
contents to the log if necessary.
+        if (cf->log_message) {
+            char repr[qd_message_repr_len()];
+            char* message_repr = qd_message_repr((qd_message_t*)msg,
+                                                 repr,
+                                                 sizeof(repr),
+                                                 cf->log_bits);
+            if (message_repr) {
+                qd_log(qd_message_log_source(), QD_LOG_TRACE, "Link %s 
received %s",
+                       pn_link_name(pn_link),
+                       message_repr);
+            }
         }
     }
 
     //
-    // Consume the delivery.
-    //
-    pn_link_advance(pn_link);
-
-    //
     // If there's no router link, free the message and finish.  It's likely 
that the link
     // is closing.
     //
     if (!rlink) {
-        qd_message_free(msg);
+        if (receive_complete) // The entire message has been received but 
there is nowhere to send it to, free it and do nothing.
+            qd_message_free(msg);
         return;
     }
 
@@ -239,20 +235,33 @@ static void AMQP_rx_handler(void* context, qd_link_t 
*link, pn_delivery_t *pnd)
     //
     if (qdr_link_is_routed(rlink)) {
         pn_delivery_tag_t dtag = pn_delivery_tag(pnd);
-        delivery = qdr_link_deliver_to_routed_link(rlink, msg, 
pn_delivery_settled(pnd),
-                                                   (uint8_t*) dtag.start, 
dtag.size,
-                                                   
pn_disposition_type(pn_delivery_remote(pnd)),
-                                                   
pn_disposition_data(pn_delivery_remote(pnd)));
-
+        //
+        // A delivery object was already available via 
pn_delivery_get_context. This means a qdr_delivery was already created. Use it 
to continue delivery.
+        //
         if (delivery) {
-            if (pn_delivery_settled(pnd))
+            qdr_deliver_continue(delivery);
+
+            //
+            // Settle the proton delivery only if all the data has been 
received.
+            //
+            if (pn_delivery_settled(pnd) && receive_complete) {
                 pn_delivery_settle(pnd);
-            else {
-                pn_delivery_set_context(pnd, delivery);
-                qdr_delivery_set_context(delivery, pnd);
-                qdr_delivery_incref(delivery);
+                qdr_delivery_decref(router->router_core, delivery);
             }
         }
+        else {
+            delivery = qdr_link_deliver_to_routed_link(rlink,
+                                                       msg,
+                                                       
pn_delivery_settled(pnd),
+                                                       (uint8_t*) dtag.start,
+                                                       dtag.size,
+                                                       
pn_disposition_type(pn_delivery_remote(pnd)),
+                                                       
pn_disposition_data(pn_delivery_remote(pnd)));
+            pn_delivery_set_context(pnd, delivery);
+            qdr_delivery_set_context(delivery, pnd);
+            qdr_delivery_incref(delivery);
+        }
+
         return;
     }
 
@@ -294,134 +303,142 @@ static void AMQP_rx_handler(void* context, qd_link_t 
*link, pn_delivery_t *pnd)
     qd_message_depth_t  validation_depth = (anonymous_link || check_user) ? 
QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS;
     bool                valid_message    = qd_message_check(msg, 
validation_depth);
 
-    if (valid_message) {
-        if (check_user) {
-            // This connection must not allow proxied user_id
-            qd_iterator_t *userid_iter  = qd_message_field_iterator(msg, 
QD_FIELD_USER_ID);
-            if (userid_iter) {
-                // The user_id property has been specified
-                if (qd_iterator_remaining(userid_iter) > 0) {
-                    // user_id property in message is not blank
-                    if (!qd_iterator_equal(userid_iter, (const unsigned char 
*)conn->user_id)) {
-                        // This message is rejected: attempted user proxy is 
disallowed
-                        qd_log(router->log_source, QD_LOG_DEBUG, "Message 
rejected due to user_id proxy violation. User:%s", conn->user_id);
-                        pn_link_flow(pn_link, 1);
-                        pn_delivery_update(pnd, PN_REJECTED);
-                        pn_delivery_settle(pnd);
-                        qd_message_free(msg);
-                        qd_iterator_free(userid_iter);
-                        return;
-                    }
+    if (!valid_message && receive_complete) {
+        //
+        // The entire message has been received and the message is still 
invalid.  Reject the message.
+        //
+        qd_message_set_discard(msg, true);
+        pn_link_flow(pn_link, 1);
+        pn_delivery_update(pnd, PN_REJECTED);
+        pn_delivery_settle(pnd);
+        qd_message_free(msg);
+    }
+
+    if (!valid_message) {
+        return;
+    }
+
+    if (delivery) {
+        qdr_deliver_continue(delivery);
+
+        if (pn_delivery_settled(pnd) && receive_complete) {
+            pn_delivery_settle(pnd);
+            qdr_delivery_decref(router->router_core, delivery);
+        }
+        return;
+    }
+
+    if (check_user) {
+        // This connection must not allow proxied user_id
+        qd_iterator_t *userid_iter  = qd_message_field_iterator(msg, 
QD_FIELD_USER_ID);
+        if (userid_iter) {
+            // The user_id property has been specified
+            if (qd_iterator_remaining(userid_iter) > 0) {
+                // user_id property in message is not blank
+                if (!qd_iterator_equal(userid_iter, (const unsigned char 
*)conn->user_id)) {
+                    // This message is rejected: attempted user proxy is 
disallowed
+                    qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected 
due to user_id proxy violation. User:%s", conn->user_id);
+                    pn_link_flow(pn_link, 1);
+                    pn_delivery_update(pnd, PN_REJECTED);
+                    pn_delivery_settle(pnd);
+                    qd_message_free(msg);
+                    qd_iterator_free(userid_iter);
+                    return;
                 }
-                qd_iterator_free(userid_iter);
             }
+            qd_iterator_free(userid_iter);
         }
+    }
 
-        qd_message_message_annotations(msg);
-        qd_bitmask_t        *link_exclusions;
+    qd_message_message_annotations(msg);
+    qd_bitmask_t        *link_exclusions;
 
-        qd_iterator_t *ingress_iter = router_annotate_message(router, msg, 
&link_exclusions);
+    qd_iterator_t *ingress_iter = router_annotate_message(router, msg, 
&link_exclusions);
 
-        if (anonymous_link) {
-            qd_iterator_t *addr_iter = 0;
-            int phase = 0;
-            
-            //
-            // If the message has delivery annotations, get the to-override 
field from the annotations.
-            //
-            qd_parsed_field_t *ma_to = qd_message_get_to_override(msg);
-            if (ma_to) {
-                addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
-                phase = qd_message_get_phase_val(msg);
-            }
+    if (anonymous_link) {
+        qd_iterator_t *addr_iter = 0;
+        int phase = 0;
 
-            //
-            // Still no destination address?  Use the TO field from the 
message properties.
-            //
-            if (!addr_iter) {
-                addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
-
-                //
-                // If the address came from the TO field and we need to apply 
a tenant-space,
-                // set the to-override with the annotated address.
-                //
-                if (addr_iter && tenant_space) {
-                    qd_iterator_reset_view(addr_iter, 
ITER_VIEW_ADDRESS_WITH_SPACE);
-                    qd_iterator_annotate_space(addr_iter, tenant_space, 
tenant_space_len);
-                    qd_composed_field_t *to_override = qd_compose_subfield(0);
-                    qd_compose_insert_string_iterator(to_override, addr_iter);
-                    qd_message_set_to_override_annotation(msg, to_override);
-                }
-            }
+        //
+        // If the message has delivery annotations, get the to-override field 
from the annotations.
+        //
+        qd_parsed_field_t *ma_to = qd_message_get_to_override(msg);
+        if (ma_to) {
+            addr_iter = qd_iterator_dup(qd_parse_raw(ma_to));
+            phase = qd_message_get_phase_annotation(msg);
+        }
+
+        //
+        // Still no destination address?  Use the TO field from the message 
properties.
+        //
+        if (!addr_iter) {
+            addr_iter = qd_message_field_iterator(msg, QD_FIELD_TO);
 
-            if (addr_iter) {
-                qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
-                if (phase > 0)
-                    qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);
-                delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, 
addr_iter, pn_delivery_settled(pnd),
-                                               link_exclusions);
-            }
-        } else {
             //
-            // This is a targeted link, not anonymous.
+            // If the address came from the TO field and we need to apply a 
tenant-space,
+            // set the to-override with the annotated address.
             //
-            const char *term_addr = 
pn_terminus_get_address(qd_link_remote_target(link));
-            if (!term_addr)
-                term_addr = pn_terminus_get_address(qd_link_source(link));
-
-            if (term_addr) {
+            if (addr_iter && tenant_space) {
+                qd_iterator_reset_view(addr_iter, 
ITER_VIEW_ADDRESS_WITH_SPACE);
+                qd_iterator_annotate_space(addr_iter, tenant_space, 
tenant_space_len);
                 qd_composed_field_t *to_override = qd_compose_subfield(0);
-                if (tenant_space) {
-                    qd_iterator_t *aiter = qd_iterator_string(term_addr, 
ITER_VIEW_ADDRESS_WITH_SPACE);
-                    qd_iterator_annotate_space(aiter, tenant_space, 
tenant_space_len);
-                    qd_compose_insert_string_iterator(to_override, aiter);
-                    qd_iterator_free(aiter);
-                } else
-                    qd_compose_insert_string(to_override, term_addr);
+                qd_compose_insert_string_iterator(to_override, addr_iter);
                 qd_message_set_to_override_annotation(msg, to_override);
-                int phase = qdr_link_phase(rlink);
-                if (phase != 0)
-                    qd_message_set_phase_annotation(msg, phase);
             }
-            delivery = qdr_link_deliver(rlink, msg, ingress_iter, 
pn_delivery_settled(pnd), link_exclusions);
         }
 
-        if (delivery) {
-            if (pn_delivery_settled(pnd))
-                pn_delivery_settle(pnd);
-            else {
-                pn_delivery_set_context(pnd, delivery);
-                qdr_delivery_set_context(delivery, pnd);
-                qdr_delivery_incref(delivery);
-            }
-        } else {
-            //
-            // The message is now and will always be unroutable because there 
is no address.
-            //
-            pn_link_flow(pn_link, 1);
-            pn_delivery_update(pnd, PN_REJECTED);
-            pn_delivery_settle(pnd);
-            qd_message_free(msg);
+        if (addr_iter) {
+            qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
+            if (phase > 0)
+                qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);
+            delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, 
addr_iter, pn_delivery_settled(pnd),
+                                           link_exclusions);
         }
-
+    } else {
         //
-        // Rules for delivering messages:
+        // This is a targeted link, not anonymous.
         //
-        // For addressed (non-anonymous) links:
-        //   to-override must be set (done in the core?)
-        //   uses qdr_link_deliver to hand over to the core
+        const char *term_addr = 
pn_terminus_get_address(qd_link_remote_target(link));
+        if (!term_addr)
+            term_addr = pn_terminus_get_address(qd_link_source(link));
+
+        if (term_addr) {
+            qd_composed_field_t *to_override = qd_compose_subfield(0);
+            if (tenant_space) {
+                qd_iterator_t *aiter = qd_iterator_string(term_addr, 
ITER_VIEW_ADDRESS_WITH_SPACE);
+                qd_iterator_annotate_space(aiter, tenant_space, 
tenant_space_len);
+                qd_compose_insert_string_iterator(to_override, aiter);
+                qd_iterator_free(aiter);
+            } else
+                qd_compose_insert_string(to_override, term_addr);
+            qd_message_set_to_override_annotation(msg, to_override);
+            int phase = qdr_link_phase(rlink);
+            if (phase != 0)
+                qd_message_set_phase_annotation(msg, phase);
+        }
+        delivery = qdr_link_deliver(rlink, msg, ingress_iter, 
pn_delivery_settled(pnd), link_exclusions);
+    }
+
+    if (delivery) {
         //
-        // For anonymous links:
-        //   If there's a to-override in the annotations, use that address
-        //   Or, use the 'to' field in the message properties
+        // Settle the proton delivery only if all the data has arrived
         //
+        if (pn_delivery_settled(pnd)) {
+            if (receive_complete) {
+                pn_delivery_settle(pnd);
+                return;
+            }
+        }
 
-
-
+        // If this delivery is unsettled or if this is a settled multi-frame 
large message, set the context
+        pn_delivery_set_context(pnd, delivery);
+        qdr_delivery_set_context(delivery, pnd);
+        qdr_delivery_incref(delivery);
     } else {
         //
-        // Message is invalid.  Reject the message and don't involve the 
router core.
+        // If there is no delivery, the message is now and will always be 
unroutable because there is no address.
         //
+        qd_message_set_discard(msg, true);
         pn_link_flow(pn_link, 1);
         pn_delivery_update(pnd, PN_REJECTED);
         pn_delivery_settle(pnd);
@@ -447,6 +464,11 @@ static void AMQP_disposition_handler(void* context, 
qd_link_t *link, pn_delivery
     if (!delivery)
         return;
 
+    bool receive_complete = qdr_delivery_receive_complete(delivery);
+
+    if (!receive_complete)
+        return;
+
     pn_disposition_t *disp   = pn_delivery_remote(pnd);
     pn_condition_t *cond     = pn_disposition_condition(disp);
     qdr_error_t    *error    = qdr_error_from_pn(cond);
@@ -459,6 +481,7 @@ static void AMQP_disposition_handler(void* context, 
qd_link_t *link, pn_delivery
     if (pn_delivery_settled(pnd)) {
         pn_delivery_set_context(pnd, 0);
         qdr_delivery_set_context(delivery, 0);
+        qdr_delivery_set_cleared_proton_ref(delivery, true);
 
         //
         // Don't decref the delivery here.  Rather, we will _give_ the 
reference to the core.
@@ -479,8 +502,9 @@ static void AMQP_disposition_handler(void* context, 
qd_link_t *link, pn_delivery
     //
     // If settled, close out the delivery
     //
-    if (pn_delivery_settled(pnd))
+    if (pn_delivery_settled(pnd)) {
         pn_delivery_settle(pnd);
+    }
 }
 
 
@@ -988,21 +1012,21 @@ static int CORE_link_push(void *context, qdr_link_t 
*link, int limit)
     qd_link_t   *qlink  = (qd_link_t*) qdr_link_get_context(link);
     if (!qlink)
         return 0;
-    
+
     pn_link_t *plink = qd_link_pn(qlink);
 
     if (plink) {
         int link_credit = pn_link_credit(plink);
         if (link_credit > limit)
             link_credit = limit;
-        qdr_link_process_deliveries(router->router_core, link, link_credit);
-        return link_credit;
-    }
 
+        if (link_credit > 0)
+            // We will not bother calling qdr_link_process_deliveries if we 
have no credit.
+            return qdr_link_process_deliveries(router->router_core, link, 
link_credit);
+    }
     return 0;
 }
 
-
 static void CORE_link_deliver(void *context, qdr_link_t *link, qdr_delivery_t 
*dlv, bool settled)
 {
     qd_router_t *router = (qd_router_t*) context;
@@ -1014,37 +1038,61 @@ static void CORE_link_deliver(void *context, qdr_link_t 
*link, qdr_delivery_t *d
     if (!plink)
         return;
 
-    const char *tag;
-    int         tag_length;
-
-    qdr_delivery_tag(dlv, &tag, &tag_length);
-
-    pn_delivery(plink, pn_dtag(tag, tag_length));
-    pn_delivery_t *pdlv = pn_link_current(plink);
-
-    // handle any delivery-state on the transfer e.g. transactional-state
-    qdr_delivery_write_extension_state(dlv, pdlv, true);
     //
     // If the remote send settle mode is set to 'settled' then settle the 
delivery on behalf of the receiver.
     //
     bool remote_snd_settled = qd_link_remote_snd_settle_mode(qlink) == 
PN_SND_SETTLED;
+    pn_delivery_t *pdlv = 0;
+
+    if (!qdr_delivery_tag_sent(dlv)) {
+        const char *tag;
+        int         tag_length;
+
+        qdr_delivery_tag(dlv, &tag, &tag_length);
+
+        // Create a new proton delivery on link 'plink'
+        pn_delivery(plink, pn_dtag(tag, tag_length));
+
+        pdlv = pn_link_current(plink);
 
-    if (!settled && !remote_snd_settled) {
-        pn_delivery_set_context(pdlv, dlv);
-        qdr_delivery_set_context(dlv, pdlv);
-        qdr_delivery_incref(dlv);
+        // handle any delivery-state on the transfer e.g. transactional-state
+        qdr_delivery_write_extension_state(dlv, pdlv, true);
+
+        //
+        // If the remote send settle mode is set to 'settled', we should 
settle the delivery on behalf of the receiver.
+        //
+        if (!settled && !remote_snd_settled) {
+            if (qdr_delivery_get_context(dlv) == 0)  {
+                pn_delivery_set_context(pdlv, dlv);
+                qdr_delivery_set_context(dlv, pdlv);
+                qdr_delivery_incref(dlv);
+            }
+        }
+
+        qdr_delivery_set_tag_sent(dlv, true);
+    }
+
+    if (!pdlv) {
+        pdlv = pn_link_current(plink);
     }
 
     qd_message_send(qdr_delivery_message(dlv), qlink, 
qdr_link_strip_annotations_out(link));
 
-    if (!settled && remote_snd_settled)
-        // Tell the core that the delivery has been accepted and settled, 
since we are settling on behalf of the receiver
-        qdr_delivery_update_disposition(router->router_core, dlv, PN_ACCEPTED, 
true, 0, 0, false);
+    bool send_complete = qdr_delivery_send_complete(dlv);
+
+    if (send_complete) {
+        if (!settled && remote_snd_settled) {
+            // Tell the core that the delivery has been accepted and settled, 
since we are settling on behalf of the receiver
+            qdr_delivery_update_disposition(router->router_core, dlv, 
PN_ACCEPTED, true, 0, 0, false);
+        }
 
-    if (settled || remote_snd_settled)
-        pn_delivery_settle(pdlv);
+        pn_link_advance(plink);
 
-    pn_link_advance(plink);
+        if (settled || remote_snd_settled) {
+            if (pdlv)
+                pn_delivery_settle(pdlv);
+        }
+    }
 }
 
 
@@ -1089,6 +1137,7 @@ static void CORE_delivery_update(void *context, 
qdr_delivery_t *dlv, uint64_t di
         qdr_delivery_set_context(dlv, 0);
         pn_delivery_set_context(pnd, 0);
         pn_delivery_settle(pnd);
+        qdr_delivery_set_cleared_proton_ref(dlv, true);
         qdr_delivery_decref(router->router_core, dlv);
     }
 }

http://git-wip-us.apache.org/repos/asf/qpid-dispatch/blob/c9262728/tests/system_tests_one_router.py
----------------------------------------------------------------------
diff --git a/tests/system_tests_one_router.py b/tests/system_tests_one_router.py
index b8f6f9c..f520871 100644
--- a/tests/system_tests_one_router.py
+++ b/tests/system_tests_one_router.py
@@ -1100,10 +1100,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertEqual(None, test.error)
 
-    def test_17_multiframe_presettled(self):
-        test = MultiframePresettledTest(self.address)
-        test.run()
-        self.assertEqual(None, test.error)
+    # Will uncomment this test once 
https://issues.apache.org/jira/browse/PROTON-1514 is fixed
+    #def test_17_multiframe_presettled(self):
+    #    test = MultiframePresettledTest(self.address)
+    #    test.run()
+    #    self.assertEqual(None, test.error)
 
     def test_18_released_vs_modified(self):
         test = ReleasedVsModifiedTest(self.address)
@@ -1135,6 +1136,11 @@ class RouterTest(TestCase):
         test.run()
         self.assertTrue(test.passed)
 
+    def test_22_large_streaming_test(self):
+        test = LargeMessageStreamTest(self.address)
+        test.run()
+        self.assertEqual(None, test.error)
+
     def test_reject_disposition(self):
         test = RejectDispositionTest(self.address)
         test.run()
@@ -1352,6 +1358,53 @@ class MulticastUnsettledTest(MessagingHandler):
     def run(self):
         Container(self).run()
 
+class LargeMessageStreamTest(MessagingHandler):
+    def __init__(self, address):
+        super(LargeMessageStreamTest, self).__init__()
+        self.address = address
+        self.dest = "LargeMessageStreamTest"
+        self.error = None
+        self.count = 10
+        self.n_sent = 0
+        self.timer = None
+        self.conn = None
+        self.sender = None
+        self.receiver = None
+        self.n_received = 0
+        self.body = ""
+        for i in range(10000):
+            self.body += "0123456789101112131415"
+
+    def check_if_done(self):
+        if self.n_received == self.count:
+            self.timer.cancel()
+            self.conn.close()
+
+    def timeout(self):
+        self.error = "Timeout Expired: sent=%d, received=%d" % (self.n_sent, 
self.n_received)
+        self.conn.close()
+
+    def on_start(self, event):
+        self.timer = event.reactor.schedule(TIMEOUT, Timeout(self))
+        self.conn = event.container.connect(self.address)
+        self.sender = event.container.create_sender(self.conn, self.dest)
+        self.receiver = event.container.create_receiver(self.conn, self.dest, 
name="A")
+        self.receiver.flow(self.count)
+
+    def on_sendable(self, event):
+        for i in range(self.count):
+            msg = Message(body=self.body)
+            # send(msg) calls the stream function which streams data from 
sender to the router
+            event.sender.send(msg)
+            self.n_sent += 1
+
+    def on_message(self, event):
+        self.n_received += 1
+        self.check_if_done()
+
+    def run(self):
+        Container(self).run()
+
 
 class MultiframePresettledTest(MessagingHandler):
     def __init__(self, address):


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to