This is an automated email from the ASF dual-hosted git repository. tross pushed a commit to branch dev-protocol-adaptors in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git
commit 9bf4ffd5ed2f391de05971106668047b1aa8c4c7 Author: Ted Ross <tr...@apache.org> AuthorDate: Thu Jun 4 09:05:32 2020 -0400 Dataplane: Added message method to set send-complete. Added reference code to receive messages (non streamed). --- include/qpid/dispatch/message.h | 6 ++++++ src/adaptors/reference_adaptor.c | 32 ++++++++++++++++++++++++++------ src/message.c | 15 +++++++++++---- 3 files changed, 43 insertions(+), 10 deletions(-) diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h index 030fc0c..29b2335 100644 --- a/include/qpid/dispatch/message.h +++ b/include/qpid/dispatch/message.h @@ -363,6 +363,12 @@ bool qd_message_receive_complete(qd_message_t *msg); bool qd_message_send_complete(qd_message_t *msg); /** + * Flag the message as being send-complete. + */ +void qd_message_set_send_complete(qd_message_t *msg); + + +/** * Returns true if the delivery tag has already been sent. */ bool qd_message_tag_sent(qd_message_t *msg); diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c index 95801f2..1680e8d 100644 --- a/src/adaptors/reference_adaptor.c +++ b/src/adaptors/reference_adaptor.c @@ -25,7 +25,7 @@ #include <stdio.h> #include <inttypes.h> -static char *address = "echo-service"; +static char *address = "examples"; typedef struct qdr_ref_adaptor_t { qdr_core_t *core; @@ -98,6 +98,8 @@ static void qdr_ref_second_attach(void *context, qdr_link_t *link, 0, //const char *terminus_addr, &link_id); + qdr_link_flow(adaptor->core, adaptor->out_link, 10, false); + qd_iterator_t *reply_iter = qdr_terminus_get_address(source); adaptor->reply_to = (char*) qd_iterator_copy(reply_iter); printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to); @@ -142,8 +144,8 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, int credit) DEQ_INIT(buffers); buf = qd_buffer(); char *insert = (char*) qd_buffer_cursor(buf); - strcpy(insert, "Test Payload"); - qd_buffer_insert(buf, 13); + memcpy(insert, "\x00\x53\x77\xa1\x0cTest Payload", 17); + qd_buffer_insert(buf, 17); DEQ_INSERT_HEAD(buffers, buf); qd_message_compose_5(msg, props, &buffers, true); @@ -172,19 +174,37 @@ static void qdr_ref_drain(void *context, qdr_link_t *link, bool mode) static int qdr_ref_push(void *context, qdr_link_t *link, int limit) { - return 0; + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; + printf("qdr_ref_push: limit=%d\n", limit); + return qdr_link_process_deliveries(adaptor->core, link, limit); } static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, qdr_delivery_t *delivery, bool settled) { - return 0; + qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context; + qd_message_t *msg = qdr_delivery_message(delivery); + + qd_message_depth_status_t status = qd_message_check_depth(msg, QD_DEPTH_BODY); + + if (status == QD_MESSAGE_DEPTH_OK) { + qd_iterator_t *body_iter = qd_message_field_iterator(msg, QD_FIELD_BODY); + char *body = (char*) qd_iterator_copy(body_iter); + printf("qdr_ref_deliver: message received, body=%s\n", body); + free(body); + qd_iterator_free(body_iter); + qd_message_set_send_complete(msg); + } + + qdr_link_flow(adaptor->core, link, 1, false); + + return PN_ACCEPTED; // This will cause the delivery to be settled } static int qdr_ref_get_credit(void *context, qdr_link_t *link) { - return 0; + return 8; } diff --git a/src/message.c b/src/message.c index 06399cf..be0c7a8 100644 --- a/src/message.c +++ b/src/message.c @@ -1222,6 +1222,16 @@ bool qd_message_send_complete(qd_message_t *in_msg) return msg->send_complete; } + +void qd_message_set_send_complete(qd_message_t *in_msg) +{ + if (!!in_msg) { + qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg; + msg->send_complete = true; + } +} + + bool qd_message_tag_sent(qd_message_t *in_msg) { if (!in_msg) @@ -2192,10 +2202,7 @@ void qd_message_compose_5(qd_message_t *msg, DEQ_APPEND(content->buffers, (*headers_buffers)); if (body) { - qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0); - qd_compose_insert_binary_buffers(field, body); - DEQ_APPEND(content->buffers, (*qd_compose_buffers(field))); - qd_compose_free(field); + DEQ_APPEND(content->buffers, (*body)); } content->receive_complete = complete; --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org