This is an automated email from the ASF dual-hosted git repository.

kgiusti pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 0f03e24a6a3e25bc5e55fc0383c411e6c842fb87
Author: Kenneth Giusti <[email protected]>
AuthorDate: Mon Jan 17 09:19:34 2022 -0500

    DISPATCH-2309: Ensure rx message validation succeeds before forwarding
    
    o Determine the proper validation depth if message logging used
    o move message logging calls to point of forwarding
    o Include link-routed messages in validation
    o Fail if attempting to validate after streaming buffers freed
    o Extra Credit: add message rejection errors and remove dead code
---
 src/message.c                                |  76 ++--------
 src/router_node.c                            | 219 ++++++++++++++-------------
 tests/system_tests_log_message_components.py |  22 ++-
 3 files changed, 142 insertions(+), 175 deletions(-)

diff --git a/src/message.c b/src/message.c
index 1a5fdfc..e35a14f 100644
--- a/src/message.c
+++ b/src/message.c
@@ -2082,11 +2082,21 @@ static qd_message_depth_status_t 
qd_message_check_LH(qd_message_content_t *conte
     if (depth <= content->parse_depth || depth == QD_DEPTH_NONE)
         return QD_MESSAGE_DEPTH_OK; // We've already parsed at least this deep
 
+    // Is there any data to check?  This will also check for null messages, 
which
+    // are not valid:
+    //
     qd_buffer_t *buffer  = DEQ_HEAD(content->buffers);
-    if (!buffer) {
+    if (!buffer || qd_buffer_size(buffer) == 0) {
         return IS_ATOMIC_FLAG_SET(&content->receive_complete) ? 
QD_MESSAGE_DEPTH_INVALID : QD_MESSAGE_DEPTH_INCOMPLETE;
     }
 
+    if (content->buffers_freed) {
+        // this is likely a bug: the caller is attempting to access a
+        // section after the start of the message has already been sent and
+        // released, rendering the parse_buffer/cursor position invalid.
+        return QD_MESSAGE_DEPTH_INVALID;
+    }
+
     if (content->parse_buffer == 0) {
         content->parse_buffer = buffer;
         content->parse_cursor = qd_buffer_base(content->parse_buffer);
@@ -2162,12 +2172,6 @@ static qd_message_depth_status_t 
qd_message_check_LH(qd_message_content_t *conte
         // fallthrough
 
     case QD_DEPTH_BODY:
-        // In the case of multi-buffer streaming we may discard buffers that
-        // contain only the Body or Footer section for those messages that are
-        // through-only.  We really cannot validate those sections if that 
should happen
-        //
-        if (content->buffers_freed)
-            return QD_MESSAGE_DEPTH_OK;
 
         //
         // BODY (not optional, but proton allows it - see PROTON-2085)
@@ -2200,9 +2204,6 @@ static qd_message_depth_status_t 
qd_message_check_LH(qd_message_content_t *conte
         //
         // FOOTER (optional)
         //
-        if (content->buffers_freed) // see above
-            return QD_MESSAGE_DEPTH_OK;
-
         last_section = QD_DEPTH_ALL;
         rc = message_check_depth_LH(content, QD_DEPTH_ALL,
                                     FOOTER_LONG, FOOTER_SHORT, TAGS_MAP,
@@ -2837,61 +2838,6 @@ qd_message_stream_data_result_t 
qd_message_next_stream_data(qd_message_t *in_msg
 }
 
 
-int qd_message_read_body(qd_message_t *in_msg, pn_raw_buffer_t* buffers, int 
length)
-{
-    qd_message_pvt_t     *msg     = (qd_message_pvt_t*) in_msg;
-    if (!(msg->cursor.buffer && msg->cursor.cursor)) {
-        qd_field_location_t  *loc     = qd_message_field_location(in_msg, 
QD_FIELD_BODY);
-        if (!loc || loc->tag == QD_AMQP_NULL)
-            return 0;
-        // TODO: need to actually determine this, could be different if vbin32 
sent
-        int preamble = 5;
-        if (loc->offset + preamble < qd_buffer_size(loc->buffer)) {
-            msg->cursor.buffer = loc->buffer;
-            msg->cursor.cursor = qd_buffer_base(loc->buffer) + loc->offset + 
preamble;
-        } else {
-            msg->cursor.buffer = DEQ_NEXT(loc->buffer);
-            if (!msg->cursor.buffer) return 0;
-            msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + 
((loc->offset + preamble) - qd_buffer_size(loc->buffer));
-        }
-    }
-
-    qd_buffer_t   *buf    = msg->cursor.buffer;
-    unsigned char *cursor = msg->cursor.cursor;
-
-    // if we are at the end of the current buffer, try to move to the
-    // next buffer
-    if (cursor == qd_buffer_base(buf) + qd_buffer_size(buf)) {
-        buf = DEQ_NEXT(buf);
-        if (buf) {
-            cursor = qd_buffer_base(buf);
-            msg->cursor.buffer = buf;
-            msg->cursor.cursor = cursor;
-        } else {
-            return 0;
-        }
-    }
-
-    int count;
-    for (count = 0; count < length && buf; count++) {
-        buffers[count].bytes = (char*) qd_buffer_base(buf);
-        buffers[count].capacity = qd_buffer_size(buf);
-        buffers[count].size = qd_buffer_size(buf);
-        buffers[count].offset = cursor - qd_buffer_base(buf);
-        buffers[count].context = (uintptr_t) buf;
-        buf = DEQ_NEXT(buf);
-        if (buf) {
-            cursor = qd_buffer_base(buf);
-            msg->cursor.buffer = buf;
-            msg->cursor.cursor = cursor;
-        } else {
-            msg->cursor.cursor = qd_buffer_base(msg->cursor.buffer) + 
qd_buffer_size(msg->cursor.buffer);
-        }
-    }
-    return count;
-}
-
-
 qd_parsed_field_t *qd_message_get_ingress(qd_message_t *msg)
 {
     return ((qd_message_pvt_t*) msg)->content->ma_pf_ingress;
diff --git a/src/router_node.c b/src/router_node.c
index de9c70a..83e74fb 100644
--- a/src/router_node.c
+++ b/src/router_node.c
@@ -61,6 +61,25 @@ const char *QD_AMQP_COND_OVERSIZE_DESCRIPTION = "Message 
size exceeded";
 // qdr_delivery.context => pn_delivery
 //
 
+// reject a delivery, setting the apropriate condition fields so the sender can
+// determine the reason the message was rejected
+//
+static inline void _reject_delivery(pn_delivery_t *pnd, const char 
*error_name, const char *description)
+{
+    assert(error_name && description);
+    pn_condition_t *lcond = pn_disposition_condition(pn_delivery_local(pnd));
+    (void) pn_condition_set_name(lcond, error_name);
+    (void) pn_condition_set_description(lcond, description);
+    pn_delivery_update(pnd, PN_REJECTED);
+}
+
+
+static inline const char *_get_tenant_space(qd_connection_t *conn, int *length)
+{
+    qdr_connection_t *qdr_conn = (qdr_connection_t*) 
qd_connection_get_context(conn);
+    return qdr_connection_get_tenant_space(qdr_conn, length);
+}
+
 
 static void qdr_node_connect_deliveries(qd_link_t *link, qdr_delivery_t *qdlv, 
pn_delivery_t *pdlv)
 {
@@ -475,47 +494,9 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     qd_message_t   *msg   = qd_message_receive(pnd);
     bool receive_complete = qd_message_receive_complete(msg);
 
-    //
-    // The very first time AMQP_rx_handler is called on a PN_DELIVERY event, 
it calls  qd_message_receive(). When qd_message_receive() returns, we check 
here if
-    // there are any data in the content buffers. If there is no content in 
the buffers, there is no reason to route the delivery. We will wait for some 
data
-    // in the buffers before we start to route the delivery.
-    // Notice that the if statement checks for the existence of a delivery 
(qdr_delivery_t). Existence of a delivery means that the delivery has been 
routed when
-    // there was data in the buffers (When a delivery has been routed 
successfully, the delivery (qdr_delivery_t) will be non null)
-    //
-    // The following if statement will deal with the following cases:-
-    // 1. We receive one empty transfer frame with more=true followed by 
another empty transfer frame with (more=false and abort=true) or with just 
more=false
-    //    In this case, there is no data at all in the message content 
buffers, we will reject the message when receive_complete=true. We will never 
route this
-    //    delivery, so core thread will not be involved
-    // 2. We receive 2 or more empty transfer frames with more=true followed 
by another empty transfer frame with (more=false and abort=true) or with just 
more=false
-    //    This case is similar to #1. We have no content in any of the 
buffers, we will reject this message after receive_complete=true. We will never 
route this
-    //    delivery, so core thread will not be involved
-    // 3. Exactly one empty transfer frame with more=false and abort=false
-    //    In this case, again there is still no content in any of the buffers, 
we will reject this message. Again, we will not route this message, so the core 
thread is not involved.
-    //
-    if (!delivery && !qd_message_has_data_in_content_or_pending_buffers(msg)) {
-        if (receive_complete) {
-            // There is no qdr_delivery_t (delivery) yet which means this 
message has not been routed yet (the first run of this function is not complete 
yet) and
-            // the message is fully received (receive_complete=true) but there 
is no content in the message buffers.
-            // This is only possible if there were one or more empty transfer 
frames.
-            // Since there is nothing in the message, we will reject it (AMQP 
message must have a non empty message body)
-            pn_link_flow(pn_link, 1);
-            if (pn_delivery_aborted(pnd))
-                qd_message_set_discard(msg, true);
-            pn_delivery_update(pnd, PN_REJECTED);
-            pn_delivery_settle(pnd);
-            // qd_message_free will free all the associated content buffers 
and also the content->pending buffer
-            qd_message_free(msg);
-            qd_log(router->log_source, QD_LOG_TRACE, "Message rejected due to 
empty message");
-        }
-
-        return false;
-    }
-
     if (!qd_message_oversize(msg)) {
         // message not rejected as oversize
         if (receive_complete) {
-            log_link_message(conn, pn_link, msg);
-
             //
             // The entire message has been received and we are ready to 
consume the delivery by calling pn_link_advance().
             //
@@ -568,10 +549,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
         // message is oversize
         if (receive_complete) {
             // set condition, reject, and settle the incoming delivery
-            pn_condition_t *lcond = 
pn_disposition_condition(pn_delivery_local(pnd));
-            (void) pn_condition_set_name(       lcond, 
QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED);
-            (void) pn_condition_set_description(lcond, 
QD_AMQP_COND_OVERSIZE_DESCRIPTION);
-            pn_delivery_update(pnd, PN_REJECTED);
+            _reject_delivery(pnd, QD_AMQP_COND_MESSAGE_SIZE_EXCEEDED, 
QD_AMQP_COND_OVERSIZE_DESCRIPTION);
             pn_delivery_settle(pnd);
             // close the link
             pn_link_close(pn_link);
@@ -622,9 +600,58 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
     }
 
     //
+    // Validate the content of the delivery as an AMQP message.  This is done
+    // partially, only to validate that we can find the fields we need to route
+    // the message.
+    //
+    // If per-message tracing is configured then validate the sections
+    // necessary for logging.
+    //
+    // link-routing: it is not necessary to validate any sections, but doing so
+    // will force a message validity check and ensure the message is not null.
+    //
+    // If the link is anonymous, we must validate through the message
+    // properties to find the 'to' field.  If the link is not anonymous, we
+    // don't need the 'to' field as we will be using the address from the link
+    // target.
+    //
+    // Check if the user id needs to be validated (see below). If it does we
+    // need to validate the message properties section.
+    //
+    // Otherwise check the message annotations for router annotations necessary
+    // for forwarding.
+    //
+    const bool link_routed    = qdr_link_is_routed(rlink);
+    const bool anonymous_link = qdr_link_is_anonymous(rlink);
+    const bool check_user     = (conn->policy_settings && 
!conn->policy_settings->spec.allowUserIdProxy);
+    const qd_server_config_t *cf = qd_connection_config(conn);
+    const qd_message_depth_t depth = (cf && cf->log_bits != 0) ? 
QD_DEPTH_APPLICATION_PROPERTIES
+        : (link_routed) ? QD_DEPTH_HEADER
+        : (anonymous_link || check_user) ? QD_DEPTH_PROPERTIES
+        : QD_DEPTH_MESSAGE_ANNOTATIONS;
+
+    const qd_message_depth_status_t depth_valid = qd_message_check_depth(msg, 
depth);
+    switch (depth_valid) {
+    case QD_MESSAGE_DEPTH_INVALID:
+        qd_log(router->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Incoming message validation failed - 
rejected",
+               conn->connection_id,
+               qd_link_link_id(link));
+        qd_message_set_discard(msg, true);
+        pn_link_flow(pn_link, 1);
+        _reject_delivery(pnd, QD_AMQP_COND_DECODE_ERROR, "invalid message 
format");
+        pn_delivery_settle(pnd);
+        qd_message_free(msg);
+        return next_delivery;
+    case QD_MESSAGE_DEPTH_INCOMPLETE:
+        return false;  // stop rx processing
+    case QD_MESSAGE_DEPTH_OK:
+        break;
+    }
+
     // Handle the link-routed case
     //
-    if (qdr_link_is_routed(rlink)) {
+    if (link_routed) {
         pn_delivery_tag_t dtag = pn_delivery_tag(pnd);
 
         if (dtag.size > QDR_DELIVERY_TAG_MAX) {
@@ -632,7 +659,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
                    dtag.size, QDR_DELIVERY_TAG_MAX);
             qd_message_set_discard(msg, true);
             pn_link_flow(pn_link, 1);
-            pn_delivery_update(pnd, PN_REJECTED);
+            _reject_delivery(pnd, QD_AMQP_COND_INVALID_FIELD, "delivery tag 
length exceeded");
             if (receive_complete) {
                 pn_delivery_settle(pnd);
                 qd_message_free(msg);
@@ -640,6 +667,7 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
             return next_delivery;
         }
 
+        log_link_message(conn, pn_link, msg);
         delivery = qdr_link_deliver_to_routed_link(rlink,
                                                    msg,
                                                    pn_delivery_settled(pnd),
@@ -653,58 +681,13 @@ static bool AMQP_rx_handler(void* context, qd_link_t 
*link)
         return next_delivery;
     }
 
+    // Determine if the user of this connection is allowed to proxy the user_id
+    // of messages. A message user_id is proxied when the value in the message
+    // properties section differs from the authenticated user name of the
+    // connection.  If the user is not allowed to proxy the user_id then the
+    // message user_id must be blank or it must be equal to the connection user
+    // name.
     //
-    // Determine if the incoming link is anonymous.  If the link is addressed,
-    // there are some optimizations we can take advantage of.
-    //
-    bool anonymous_link = qdr_link_is_anonymous(rlink);
-
-    //
-    // Determine if the user of this connection is allowed to proxy the
-    // user_id of messages. A message user_id is proxied when the
-    // property value differs from the authenticated user name of the 
connection.
-    // If the user is not allowed to proxy the user_id then the message user_id
-    // must be blank or it must be equal to the connection user name.
-    //
-    bool              check_user   = false;
-    qdr_connection_t *qdr_conn     = (qdr_connection_t*) 
qd_connection_get_context(conn);
-    int               tenant_space_len;
-    const char       *tenant_space = qdr_connection_get_tenant_space(qdr_conn, 
&tenant_space_len);
-    if (conn->policy_settings)
-        check_user = !conn->policy_settings->spec.allowUserIdProxy;
-
-    //
-    // Validate the content of the delivery as an AMQP message.  This is done 
partially, only
-    // to validate that we can find the fields we need to route the message.
-    //
-    // If the link is anonymous, we must validate through the message 
properties to find the
-    // 'to' field.  If the link is not anonymous, we don't need the 'to' field 
as we will be
-    // using the address from the link target.
-    //
-    // Validate the content of the delivery as an AMQP message.  This is done 
partially, only
-    // to validate that we can find the fields we need to route the message.
-    //
-    // If the link is anonymous, we must validate through the message 
properties to find the
-    // 'to' field.  If the link is not anonymous, we don't need the 'to' field 
as we will be
-    // using the address from the link target.
-    //
-    qd_message_depth_t  validation_depth = (anonymous_link || check_user) ? 
QD_DEPTH_PROPERTIES : QD_DEPTH_MESSAGE_ANNOTATIONS;
-    qd_message_depth_status_t  depth_valid = qd_message_check_depth(msg, 
validation_depth);
-
-    if (depth_valid != QD_MESSAGE_DEPTH_OK) {
-        if (depth_valid == QD_MESSAGE_DEPTH_INVALID) {
-            qd_message_set_discard(msg, true);
-            pn_link_flow(pn_link, 1);
-            pn_delivery_update(pnd, PN_REJECTED);
-            pn_delivery_settle(pnd);
-            qd_message_free(msg);
-        } else {
-            // otherwise wait until more data arrives and re-try the validation
-            assert(depth_valid == QD_MESSAGE_DEPTH_INCOMPLETE);
-        }
-        return next_delivery;
-    }
-
     if (check_user) {
         // This connection must not allow proxied user_id
         qd_iterator_t *userid_iter  = qd_message_field_iterator(msg, 
QD_FIELD_USER_ID);
@@ -714,10 +697,14 @@ static bool AMQP_rx_handler(void* context, qd_link_t 
*link)
                 // user_id property in message is not blank
                 if (!qd_iterator_equal(userid_iter, (const unsigned char 
*)conn->user_id)) {
                     // This message is rejected: attempted user proxy is 
disallowed
-                    qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected 
due to user_id proxy violation. User:%s", conn->user_id);
+                    qd_log(router->log_source, QD_LOG_DEBUG,
+                           "[C%"PRIu64"][L%"PRIu64"] Message rejected due to 
user_id proxy violation. User:%s",
+                           conn->connection_id,
+                           qd_link_link_id(link),
+                           conn->user_id);
                     qd_message_set_discard(msg, true);
                     pn_link_flow(pn_link, 1);
-                    pn_delivery_update(pnd, PN_REJECTED);
+                    _reject_delivery(pnd, QD_AMQP_COND_UNAUTHORIZED_ACCESS, 
"user_id proxy violation");
                     if (receive_complete) {
                         pn_delivery_settle(pnd);
                         qd_message_free(msg);
@@ -810,12 +797,16 @@ static bool AMQP_rx_handler(void* context, qd_link_t 
*link)
             // If the address came from the TO field and we need to apply a 
tenant-space,
             // set the to-override with the annotated address.
             //
-            if (addr_iter && tenant_space) {
-                qd_iterator_reset_view(addr_iter, 
ITER_VIEW_ADDRESS_WITH_SPACE);
-                qd_iterator_annotate_space(addr_iter, tenant_space, 
tenant_space_len);
-                qd_composed_field_t *to_override = qd_compose_subfield(0);
-                qd_compose_insert_string_iterator(to_override, addr_iter);
-                qd_message_set_to_override_annotation(msg, to_override);
+            if (addr_iter) {
+                int tenant_space_length;
+                const char *tenant_space = _get_tenant_space(conn, 
&tenant_space_length);
+                if (tenant_space) {
+                    qd_iterator_reset_view(addr_iter, 
ITER_VIEW_ADDRESS_WITH_SPACE);
+                    qd_iterator_annotate_space(addr_iter, tenant_space, 
tenant_space_length);
+                    qd_composed_field_t *to_override = qd_compose_subfield(0);
+                    qd_compose_insert_string_iterator(to_override, addr_iter);
+                    qd_message_set_to_override_annotation(msg, to_override);
+                }
             }
         }
 
@@ -824,16 +815,22 @@ static bool AMQP_rx_handler(void* context, qd_link_t 
*link)
                 qd_iterator_reset_view(addr_iter, ITER_VIEW_ADDRESS_HASH);
                 if (phase > 0)
                     qd_iterator_annotate_phase(addr_iter, '0' + (char) phase);
+
+                log_link_message(conn, pn_link, msg);
                 delivery = qdr_link_deliver_to(rlink, msg, ingress_iter, 
addr_iter, pn_delivery_settled(pnd),
                                                link_exclusions, ingress_index,
                                                pn_delivery_remote_state(pnd),
                                                
qd_delivery_read_remote_state(pnd));
             } else {
                 //reject
-                qd_log(router->log_source, QD_LOG_DEBUG, "Message rejected due 
to policy violation on target. User:%s", conn->user_id);
+                qd_log(router->log_source, QD_LOG_DEBUG,
+                       "[C%"PRIu64"][L%"PRIu64"] Message rejected due to 
policy violation on target. User:%s",
+                       conn->connection_id,
+                       qd_link_link_id(link),
+                       conn->user_id);
                 qd_message_set_discard(msg, true);
                 pn_link_flow(pn_link, 1);
-                pn_delivery_update(pnd, PN_REJECTED);
+                _reject_delivery(pnd, QD_AMQP_COND_UNAUTHORIZED_ACCESS, 
"policy violation on target");
                 if (receive_complete) {
                     pn_delivery_settle(pnd);
                     qd_message_free(msg);
@@ -862,9 +859,11 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
 
         if (term_addr) {
             qd_composed_field_t *to_override = qd_compose_subfield(0);
+            int tenant_space_length;
+            const char *tenant_space = _get_tenant_space(conn, 
&tenant_space_length);
             if (tenant_space) {
                 qd_iterator_t *aiter = qd_iterator_string(term_addr, 
ITER_VIEW_ADDRESS_WITH_SPACE);
-                qd_iterator_annotate_space(aiter, tenant_space, 
tenant_space_len);
+                qd_iterator_annotate_space(aiter, tenant_space, 
tenant_space_length);
                 qd_compose_insert_string_iterator(to_override, aiter);
                 qd_iterator_free(aiter);
             } else
@@ -874,6 +873,8 @@ static bool AMQP_rx_handler(void* context, qd_link_t *link)
             if (phase != 0)
                 qd_message_set_phase_annotation(msg, phase);
         }
+
+        log_link_message(conn, pn_link, msg);
         delivery = qdr_link_deliver(rlink, msg, ingress_iter, 
pn_delivery_settled(pnd), link_exclusions, ingress_index,
                                     pn_delivery_remote_state(pnd),
                                     qd_delivery_read_remote_state(pnd));
@@ -891,10 +892,14 @@ static bool AMQP_rx_handler(void* context, qd_link_t 
*link)
         //
         // If there is no delivery, the message is now and will always be 
unroutable because there is no address.
         //
+        qd_log(router->log_source, QD_LOG_DEBUG,
+               "[C%"PRIu64"][L%"PRIu64"] Message rejected - no address 
present",
+               conn->connection_id,
+               qd_link_link_id(link));
         qd_bitmask_free(link_exclusions);
         qd_message_set_discard(msg, true);
         pn_link_flow(pn_link, 1);
-        pn_delivery_update(pnd, PN_REJECTED);
+        _reject_delivery(pnd, QD_AMQP_COND_PRECONDITION_FAILED, "Routing 
failure: no address present");
         if (receive_complete) {
             pn_delivery_settle(pnd);
             qd_message_free(msg);
diff --git a/tests/system_tests_log_message_components.py 
b/tests/system_tests_log_message_components.py
index 79c2b56..2d3865f 100644
--- a/tests/system_tests_log_message_components.py
+++ b/tests/system_tests_log_message_components.py
@@ -26,6 +26,12 @@ from proton.handlers import MessagingHandler
 from proton.reactor import Container
 from qpid_dispatch_internal.compat import BINARY
 
+# force streaming in order to check that
+# freeing sent buffers does not lose fields
+# needed by logging
+MAX_FRAME = 1024
+BIG_BODY = 'X' * 1000000
+
 
 class RouterMessageLogTestBase(TestCase):
     def run_qdmanage(self, cmd, input=None, expect=Process.EXIT_OK, 
address=None):
@@ -55,7 +61,17 @@ class RouterMessageLogTestAll(RouterMessageLogTestBase):
         config = Qdrouterd.Config([
             ('router', {'mode': 'standalone', 'id': 'QDR'}),
 
-            ('listener', {'port': cls.tester.get_port(), 
'messageLoggingComponents': 'all'}),
+            ('listener', {'port': cls.tester.get_port(),
+                          'maxFrameSize': MAX_FRAME,
+                          'messageLoggingComponents': 'all'}),
+
+            ('log', {'module': 'MESSAGE',
+                     'enable': 'trace+',
+                     'outputFile': 'QDR-message.log'}),
+            ('log', {'module': 'DEFAULT',
+                     'enable': 'info+',
+                     'includeSource': 'true',
+                     'outputFile': 'QDR.log'}),
 
             ('address', {'prefix': 'closest', 'distribution': 'closest'}),
             ('address', {'prefix': 'spread', 'distribution': 'balanced'}),
@@ -201,12 +217,12 @@ class LogMessageTest(MessagingHandler):
             application_properties['app-property'] = [10, 20, 30]
             application_properties['some-other'] = symbol("O_one")
             msg.properties = application_properties
-            msg.body = "Hello World!"
+            msg.body = ["Hello World!", BIG_BODY]
             event.sender.send(msg)
             self.sent = True
 
     def on_message(self, event):
-        if "Hello World!" == event.message.body:
+        if "Hello World!" == event.message.body[0]:
             self.message_received = True
         event.connection.close()
 

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to