This is an automated email from the ASF dual-hosted git repository.

tross pushed a commit to branch dev-protocol-adaptors
in repository https://gitbox.apache.org/repos/asf/qpid-dispatch.git

commit 9bf4ffd5ed2f391de05971106668047b1aa8c4c7
Author: Ted Ross <tr...@apache.org>
AuthorDate: Thu Jun 4 09:05:32 2020 -0400

    Dataplane: Added message method to set send-complete. Added reference code 
to receive messages (non streamed).
---
 include/qpid/dispatch/message.h  |  6 ++++++
 src/adaptors/reference_adaptor.c | 32 ++++++++++++++++++++++++++------
 src/message.c                    | 15 +++++++++++----
 3 files changed, 43 insertions(+), 10 deletions(-)

diff --git a/include/qpid/dispatch/message.h b/include/qpid/dispatch/message.h
index 030fc0c..29b2335 100644
--- a/include/qpid/dispatch/message.h
+++ b/include/qpid/dispatch/message.h
@@ -363,6 +363,12 @@ bool qd_message_receive_complete(qd_message_t *msg);
 bool qd_message_send_complete(qd_message_t *msg);
 
 /**
+ * Flag the message as being send-complete.
+ */
+void qd_message_set_send_complete(qd_message_t *msg);
+
+
+/**
  * Returns true if the delivery tag has already been sent.
  */
 bool qd_message_tag_sent(qd_message_t *msg);
diff --git a/src/adaptors/reference_adaptor.c b/src/adaptors/reference_adaptor.c
index 95801f2..1680e8d 100644
--- a/src/adaptors/reference_adaptor.c
+++ b/src/adaptors/reference_adaptor.c
@@ -25,7 +25,7 @@
 #include <stdio.h>
 #include <inttypes.h>
 
-static char *address = "echo-service";
+static char *address = "examples";
 
 typedef struct qdr_ref_adaptor_t {
     qdr_core_t             *core;
@@ -98,6 +98,8 @@ static void qdr_ref_second_attach(void *context, qdr_link_t 
*link,
                                                   0,                //const 
char       *terminus_addr,
                                                   &link_id);
 
+        qdr_link_flow(adaptor->core, adaptor->out_link, 10, false);
+
         qd_iterator_t *reply_iter = qdr_terminus_get_address(source);
         adaptor->reply_to = (char*) qd_iterator_copy(reply_iter);
         printf("qdr_ref_second_attach: reply-to=%s\n", adaptor->reply_to);
@@ -142,8 +144,8 @@ static void qdr_ref_flow(void *context, qdr_link_t *link, 
int credit)
         DEQ_INIT(buffers);
         buf = qd_buffer();
         char *insert = (char*) qd_buffer_cursor(buf);
-        strcpy(insert, "Test Payload");
-        qd_buffer_insert(buf, 13);
+        memcpy(insert, "\x00\x53\x77\xa1\x0cTest Payload", 17);
+        qd_buffer_insert(buf, 17);
         DEQ_INSERT_HEAD(buffers, buf);
 
         qd_message_compose_5(msg, props, &buffers, true);
@@ -172,19 +174,37 @@ static void qdr_ref_drain(void *context, qdr_link_t 
*link, bool mode)
 
 static int qdr_ref_push(void *context, qdr_link_t *link, int limit)
 {
-    return 0;
+    qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+    printf("qdr_ref_push: limit=%d\n", limit);
+    return qdr_link_process_deliveries(adaptor->core, link, limit);
 }
 
 
 static uint64_t qdr_ref_deliver(void *context, qdr_link_t *link, 
qdr_delivery_t *delivery, bool settled)
 {
-    return 0;
+    qdr_ref_adaptor_t *adaptor = (qdr_ref_adaptor_t*) context;
+    qd_message_t      *msg     = qdr_delivery_message(delivery);
+
+    qd_message_depth_status_t status = qd_message_check_depth(msg, 
QD_DEPTH_BODY);
+
+    if (status == QD_MESSAGE_DEPTH_OK) {
+        qd_iterator_t *body_iter = qd_message_field_iterator(msg, 
QD_FIELD_BODY);
+        char *body = (char*) qd_iterator_copy(body_iter);
+        printf("qdr_ref_deliver: message received, body=%s\n", body);
+        free(body);
+        qd_iterator_free(body_iter);
+        qd_message_set_send_complete(msg);
+    }
+
+    qdr_link_flow(adaptor->core, link, 1, false);
+
+    return PN_ACCEPTED; // This will cause the delivery to be settled
 }
 
 
 static int qdr_ref_get_credit(void *context, qdr_link_t *link)
 {
-    return 0;
+    return 8;
 }
 
 
diff --git a/src/message.c b/src/message.c
index 06399cf..be0c7a8 100644
--- a/src/message.c
+++ b/src/message.c
@@ -1222,6 +1222,16 @@ bool qd_message_send_complete(qd_message_t *in_msg)
     return msg->send_complete;
 }
 
+
+void qd_message_set_send_complete(qd_message_t *in_msg)
+{
+    if (!!in_msg) {
+        qd_message_pvt_t *msg = (qd_message_pvt_t*) in_msg;
+        msg->send_complete = true;
+    }
+}
+
+
 bool qd_message_tag_sent(qd_message_t *in_msg)
 {
     if (!in_msg)
@@ -2192,10 +2202,7 @@ void qd_message_compose_5(qd_message_t        *msg,
         DEQ_APPEND(content->buffers, (*headers_buffers));
 
     if (body) {
-        qd_composed_field_t *field = qd_compose(QD_PERFORMATIVE_BODY_DATA, 0);
-        qd_compose_insert_binary_buffers(field, body);
-        DEQ_APPEND(content->buffers, (*qd_compose_buffers(field)));
-        qd_compose_free(field);
+        DEQ_APPEND(content->buffers, (*body));
     }
 
     content->receive_complete = complete;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to