Github user alanconway commented on a diff in the pull request: https://github.com/apache/qpid-dispatch/pull/172#discussion_r126170901 --- Diff: src/message.c --- @@ -1151,89 +1287,140 @@ void qd_message_send(qd_message_t *in_msg, { qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; qd_message_content_t *content = msg->content; - qd_buffer_t *buf = DEQ_HEAD(content->buffers); - unsigned char *cursor; + qd_buffer_t *buf = 0; pn_link_t *pnl = qd_link_pn(link); - qd_buffer_list_t new_ma; - DEQ_INIT(new_ma); + // How many receivers does this message have? + int fanout = qd_message_fanout(in_msg); - // Process the message annotations if any - compose_message_annotations(msg, &new_ma, strip_annotations); + if (msg->sent_depth < QD_DEPTH_MESSAGE_ANNOTATIONS) { + // + // Start with the very first buffer; + // + buf = DEQ_HEAD(content->buffers); - // - // This is the case where the message annotations have been modified. - // The message send must be divided into sections: The existing header; - // the new message annotations; the rest of the existing message. - // Note that the original message annotations that are still in the - // buffer chain must not be sent. - // - // Start by making sure that we've parsed the message sections through - // the message annotations - // - // ??? NO LONGER NECESSARY??? - if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) { - qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message); - return; - } + if (!qd_message_check(in_msg, QD_DEPTH_MESSAGE_ANNOTATIONS)) { + qd_log(log_source, QD_LOG_ERROR, "Cannot send: %s", qd_error_message); + return; + } - // - // Send header if present - // - cursor = qd_buffer_base(buf); - if (content->section_message_header.length > 0) { - buf = content->section_message_header.buffer; - cursor = content->section_message_header.offset + qd_buffer_base(buf); - advance(&cursor, &buf, - content->section_message_header.length + content->section_message_header.hdr_length, - send_handler, (void*) pnl); - } + // + // Send header if present + // + unsigned char *cursor = qd_buffer_base(buf); + int header_consume = content->section_message_header.length + content->section_message_header.hdr_length; + if (content->section_message_header.length > 0) { + buf = content->section_message_header.buffer; + cursor = content->section_message_header.offset + qd_buffer_base(buf); + advance(&cursor, &buf, header_consume, send_handler, (void*) pnl); + } - // - // Send delivery annotation if present - // - if (content->section_delivery_annotation.length > 0) { - buf = content->section_delivery_annotation.buffer; - cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf); - advance(&cursor, &buf, - content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length, - send_handler, (void*) pnl); - } + // + // Send delivery annotation if present + // + int da_consume = content->section_delivery_annotation.length + content->section_delivery_annotation.hdr_length; + if (content->section_delivery_annotation.length > 0) { + buf = content->section_delivery_annotation.buffer; + cursor = content->section_delivery_annotation.offset + qd_buffer_base(buf); + advance(&cursor, &buf, da_consume, send_handler, (void*) pnl); + } - // - // Send new message annotations - // - qd_buffer_t *da_buf = DEQ_HEAD(new_ma); - while (da_buf) { - char *to_send = (char*) qd_buffer_base(da_buf); - pn_link_send(pnl, to_send, qd_buffer_size(da_buf)); - da_buf = DEQ_NEXT(da_buf); - } - qd_buffer_list_free_buffers(&new_ma); + qd_buffer_list_t new_ma; + DEQ_INIT(new_ma); - // - // Skip over replaced message annotations - // - if (content->section_message_annotation.length > 0) - advance(&cursor, &buf, - content->section_message_annotation.hdr_length + content->section_message_annotation.length, - 0, 0); + // Process the message annotations if any + compose_message_annotations(msg, &new_ma, strip_annotations); + + // + // Send new message annotations + // + qd_buffer_t *da_buf = DEQ_HEAD(new_ma); + while (da_buf) { + char *to_send = (char*) qd_buffer_base(da_buf); + pn_link_send(pnl, to_send, qd_buffer_size(da_buf)); + da_buf = DEQ_NEXT(da_buf); + } + qd_buffer_list_free_buffers(&new_ma); + + // + // Skip over replaced message annotations + // + int ma_consume = content->section_message_annotation.hdr_length + content->section_message_annotation.length; + if (content->section_message_annotation.length > 0) + advance(&cursor, &buf, ma_consume, 0, 0); + + msg->cursor.buffer = buf; + + // + // If this message has no header and no delivery annotations and no message annotations, set the offset to 0. + // + if (header_consume == 0 && da_consume == 0 && ma_consume ==0) + msg->cursor.offset = 0; + else + msg->cursor.offset = cursor - qd_buffer_base(buf); + + msg->sent_depth = QD_DEPTH_MESSAGE_ANNOTATIONS; - // - // Send remaining partial buffer - // - if (buf) { - size_t len = qd_buffer_size(buf) - (cursor - qd_buffer_base(buf)); - advance(&cursor, &buf, len, send_handler, (void*) pnl); } - // Fall through to process the remaining buffers normally - // Note that 'advance' will have moved us to the next buffer in the chain. + buf = msg->cursor.buffer; + if (!buf) + return; + + bool receive_complete = qd_message_receive_complete(in_msg); while (buf) { - pn_link_send(pnl, (char*) qd_buffer_base(buf), qd_buffer_size(buf)); - buf = DEQ_NEXT(buf); + size_t buf_size = qd_buffer_size(buf); + + // This will send the remaining data in the buffer if any. + pn_link_send(pnl, (char*) qd_buffer_at(buf, msg->cursor.offset), buf_size - msg->cursor.offset); + + // If the entire message has been received, there is no need to lock before sending because no one else is + // trying to modify the data structure. + if (!receive_complete) + sys_mutex_lock(msg->content->lock); --- End diff -- I wouldn't bother with the conditional lock - if there's no contention the cost of the lock is small, and the condition introduces one more way for a future programmer to screw up the thread safety logic by mistake.
--- If your project is set up for it, you can reply to this email and have your reply appear on GitHub as well. If your project does not have this feature enabled and wishes so, or if the feature is enabled but not working, please contact infrastructure at infrastruct...@apache.org or file a JIRA ticket with INFRA. --- --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org