http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt deleted file mode 100644 index f701651..0000000 --- a/examples/c/proactor/CMakeLists.txt +++ /dev/null @@ -1,43 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -find_package(Proton REQUIRED) - -include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS}) - -add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION}) - -find_package(Libuv) -if (Libuv_FOUND) - foreach(name broker send receive) - add_executable(libuv_${name} ${name}.c libuv_proactor.c) - target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES}) - set_target_properties(libuv_${name} PROPERTIES - COMPILE_DEFINITIONS "PN_PROACTOR_INCLUDE=\"libuv_proactor.h\"") - endforeach() - - # Add a test with the correct environment to find test executables and valgrind. - if(WIN32) - set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>") - else(WIN32) - set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}") - endif(WIN32) - set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV}) - add_test(c-proactor-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py) -endif()
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/README.dox ---------------------------------------------------------------------- diff --git a/examples/c/proactor/README.dox b/examples/c/proactor/README.dox deleted file mode 100644 index 4b09cb7..0000000 --- a/examples/c/proactor/README.dox +++ /dev/null @@ -1,17 +0,0 @@ -/** - * @example send.c - * - * Send a fixed number of messages to the "example" node. - * - * @example receive.c - * - * Subscribes to the 'example' node and prints the message bodies - * received. - * - * @example broker.c - * - * A simple multithreaded broker that works with the send and receive - * examples. - * - * __Requires C++11__ - */ http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c deleted file mode 100644 index ca52336..0000000 --- a/examples/c/proactor/broker.c +++ /dev/null @@ -1,488 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <proton/connection_driver.h> -#include <proton/proactor.h> -#include <proton/engine.h> -#include <proton/sasl.h> -#include <proton/transport.h> -#include <proton/url.h> - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> - -/* TODO aconway 2016-10-14: this example does not require libuv IO, - it uses uv.h only for portable mutex and thread functions. -*/ -#include <uv.h> - -bool enable_debug = false; - -void debug(const char* fmt, ...) { - if (enable_debug) { - va_list(ap); - va_start(ap, fmt); - vfprintf(stderr, fmt, ap); - fputc('\n', stderr); - fflush(stderr); - } -} - -void check(int err, const char* s) { - if (err != 0) { - perror(s); - exit(1); - } -} - -void pcheck(int err, const char* s) { - if (err != 0) { - fprintf(stderr, "%s: %s", s, pn_code(err)); - exit(1); - } -} - -/* Simple re-sizable vector that acts as a queue */ -#define VEC(T) struct { T* data; size_t len, cap; } - -#define VEC_INIT(V) \ - do { \ - V.len = 0; \ - V.cap = 16; \ - void **vp = (void**)&V.data; \ - *vp = malloc(V.cap * sizeof(*V.data)); \ - } while(0) - -#define VEC_FINAL(V) free(V.data) - -#define VEC_PUSH(V, X) \ - do { \ - if (V.len == V.cap) { \ - V.cap *= 2; \ - void **vp = (void**)&V.data; \ - *vp = realloc(V.data, V.cap * sizeof(*V.data)); \ - } \ - V.data[V.len++] = X; \ - } while(0) \ - -#define VEC_POP(V) \ - do { \ - if (V.len > 0) \ - memmove(V.data, V.data+1, (--V.len)*sizeof(*V.data)); \ - } while(0) - -/* Simple thread-safe queue implementation */ -typedef struct queue_t { - uv_mutex_t lock; - char* name; - VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */ - VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */ - struct queue_t *next; /* Next queue in chain */ - size_t sent; /* Count of messages sent, used as delivery tag */ -} queue_t; - -static void queue_init(queue_t *q, const char* name, queue_t *next) { - debug("created queue %s", name); - uv_mutex_init(&q->lock); - q->name = strdup(name); - VEC_INIT(q->messages); - VEC_INIT(q->waiting); - q->next = next; - q->sent = 0; -} - -static void queue_destroy(queue_t *q) { - uv_mutex_destroy(&q->lock); - free(q->name); - for (size_t i = 0; i < q->messages.len; ++i) - free(q->messages.data[i].start); - VEC_FINAL(q->messages); - for (size_t i = 0; i < q->waiting.len; ++i) - pn_decref(q->waiting.data[i]); - VEC_FINAL(q->waiting); -} - -/* Send a message on s, or record s as eating if no messages. - Called in s dispatch loop, assumes s has credit. -*/ -static void queue_send(queue_t *q, pn_link_t *s) { - pn_rwbytes_t m = { 0 }; - size_t tag = 0; - uv_mutex_lock(&q->lock); - if (q->messages.len == 0) { /* Empty, record connection as waiting */ - debug("queue is empty %s", q->name); - /* Record connection for wake-up if not already on the list. */ - pn_connection_t *c = pn_session_connection(pn_link_session(s)); - size_t i = 0; - for (; i < q->waiting.len && q->waiting.data[i] != c; ++i) - ; - if (i == q->waiting.len) { - VEC_PUSH(q->waiting, c); - } - } else { - debug("sending from queue %s", q->name); - m = q->messages.data[0]; - VEC_POP(q->messages); - tag = ++q->sent; - } - uv_mutex_unlock(&q->lock); - if (m.start) { - pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag))); - pn_link_send(s, m.start, m.size); - pn_link_advance(s); - pn_delivery_settle(d); /* Pre-settled: unreliable, there will bea no ack/ */ - free(m.start); - } -} - -/* Data associated with each broker connection */ -typedef struct broker_data_t { - bool check_queues; /* Check senders on the connection for available data in queues. */ -} broker_data_t; - -/* Use the context pointer as a boolean flag to indicate we need to check queues */ -void pn_connection_set_check_queues(pn_connection_t *c, bool check) { - pn_connection_set_context(c, (void*)check); -} - -bool pn_connection_get_check_queues(pn_connection_t *c) { - return (bool)pn_connection_get_context(c); -} - -/* Put a message on the queue, called in receiver dispatch loop. - If the queue was previously empty, notify waiting senders. -*/ -static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) { - debug("received to queue %s", q->name); - uv_mutex_lock(&q->lock); - VEC_PUSH(q->messages, m); - if (q->messages.len == 1) { /* Was empty, notify waiting connections */ - for (size_t i = 0; i < q->waiting.len; ++i) { - pn_connection_t *c = q->waiting.data[i]; - pn_connection_set_check_queues(c, true); - pn_connection_wake(c); /* Wake the connection */ - } - q->waiting.len = 0; - } - uv_mutex_unlock(&q->lock); -} - -/* Thread safe set of queues */ -typedef struct queues_t { - uv_mutex_t lock; - queue_t *queues; - size_t sent; -} queues_t; - -void queues_init(queues_t *qs) { - uv_mutex_init(&qs->lock); - qs->queues = NULL; -} - -void queues_destroy(queues_t *qs) { - for (queue_t *q = qs->queues; q; q = q->next) { - queue_destroy(q); - free(q); - } - uv_mutex_destroy(&qs->lock); -} - -/** Get or create the named queue. */ -queue_t* queues_get(queues_t *qs, const char* name) { - uv_mutex_lock(&qs->lock); - queue_t *q; - for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next) - ; - if (!q) { - q = (queue_t*)malloc(sizeof(queue_t)); - queue_init(q, name, qs->queues); - qs->queues = q; - } - uv_mutex_unlock(&qs->lock); - return q; -} - -/* The broker implementation */ -typedef struct broker_t { - pn_proactor_t *proactor; - queues_t queues; - const char *container_id; /* AMQP container-id */ - size_t threads; - pn_millis_t heartbeat; - bool finished; -} broker_t; - -void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) { - memset(b, 0, sizeof(*b)); - b->proactor = pn_proactor(); - queues_init(&b->queues); - b->container_id = container_id; - b->threads = threads; - b->heartbeat = 0; -} - -void broker_stop(broker_t *b) { - /* In this broker an interrupt stops a thread, stopping all threads stops the broker */ - for (size_t i = 0; i < b->threads; ++i) - pn_proactor_interrupt(b->proactor); -} - -/* Try to send if link is sender and has credit */ -static void link_send(broker_t *b, pn_link_t *s) { - if (pn_link_is_sender(s) && pn_link_credit(s) > 0) { - const char *qname = pn_terminus_get_address(pn_link_source(s)); - queue_t *q = queues_get(&b->queues, qname); - queue_send(q, s); - } -} - -static void queue_unsub(queue_t *q, pn_connection_t *c) { - uv_mutex_lock(&q->lock); - for (size_t i = 0; i < q->waiting.len; ++i) { - if (q->waiting.data[i] == c){ - q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */ - VEC_POP(q->waiting); - break; - } - } - uv_mutex_unlock(&q->lock); -} - -/* Unsubscribe from the queue of interest to this link. */ -static void link_unsub(broker_t *b, pn_link_t *s) { - if (pn_link_is_sender(s)) { - const char *qname = pn_terminus_get_address(pn_link_source(s)); - if (qname) { - queue_t *q = queues_get(&b->queues, qname); - queue_unsub(q, pn_session_connection(pn_link_session(s))); - } - } -} - -/* Called in connection's event loop when a connection is woken for messages.*/ -static void connection_unsub(broker_t *b, pn_connection_t *c) { - for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) - link_unsub(b, l); -} - -static void session_unsub(broker_t *b, pn_session_t *ssn) { - pn_connection_t *c = pn_session_connection(ssn); - for (pn_link_t *l = pn_link_head(c, 0); l != NULL; l = pn_link_next(l, 0)) { - if (pn_link_session(l) == ssn) - link_unsub(b, l); - } -} - -static void check_condition(pn_event_t *e, pn_condition_t *cond) { - if (pn_condition_is_set(cond)) { - const char *ename = e ? pn_event_type_name(pn_event_type(e)) : "UNKNOWN"; - fprintf(stderr, "%s: %s: %s\n", ename, - pn_condition_get_name(cond), pn_condition_get_description(cond)); - } -} - -const int WINDOW=10; /* Incoming credit window */ - -static void handle(broker_t* b, pn_event_t* e) { - pn_connection_t *c = pn_event_connection(e); - - switch (pn_event_type(e)) { - - case PN_LISTENER_ACCEPT: - pn_listener_accept(pn_event_listener(e), pn_connection()); - break; - - case PN_CONNECTION_INIT: - pn_connection_set_container(c, b->container_id); - break; - - case PN_CONNECTION_BOUND: { - /* Turn off security */ - pn_transport_t *t = pn_connection_transport(c); - pn_transport_require_auth(t, false); - pn_sasl_allowed_mechs(pn_sasl(t), "ANONYMOUS"); - pn_transport_set_idle_timeout(t, 2 * b->heartbeat); - } - case PN_CONNECTION_REMOTE_OPEN: { - pn_connection_open(pn_event_connection(e)); /* Complete the open */ - break; - } - case PN_CONNECTION_WAKE: { - if (pn_connection_get_check_queues(c)) { - pn_connection_set_check_queues(c, false); - int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE; - for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags)) - link_send(b, l); - } - break; - } - case PN_SESSION_REMOTE_OPEN: { - pn_session_open(pn_event_session(e)); - break; - } - case PN_LINK_REMOTE_OPEN: { - pn_link_t *l = pn_event_link(e); - if (pn_link_is_sender(l)) { - const char *source = pn_terminus_get_address(pn_link_remote_source(l)); - pn_terminus_set_address(pn_link_source(l), source); - } else { - const char* target = pn_terminus_get_address(pn_link_remote_target(l)); - pn_terminus_set_address(pn_link_target(l), target); - pn_link_flow(l, WINDOW); - } - pn_link_open(l); - break; - } - case PN_LINK_FLOW: { - link_send(b, pn_event_link(e)); - break; - } - case PN_DELIVERY: { - pn_delivery_t *d = pn_event_delivery(e); - pn_link_t *r = pn_delivery_link(d); - if (pn_link_is_receiver(r) && - pn_delivery_readable(d) && !pn_delivery_partial(d)) - { - size_t size = pn_delivery_pending(d); - /* The broker does not decode the message, just forwards it. */ - pn_rwbytes_t m = { size, (char*)malloc(size) }; - pn_link_recv(r, m.start, m.size); - const char *qname = pn_terminus_get_address(pn_link_target(r)); - queue_receive(b->proactor, queues_get(&b->queues, qname), m); - pn_delivery_update(d, PN_ACCEPTED); - pn_delivery_settle(d); - pn_link_flow(r, WINDOW - pn_link_credit(r)); - } - break; - } - - case PN_TRANSPORT_CLOSED: - connection_unsub(b, pn_event_connection(e)); - check_condition(e, pn_transport_condition(pn_event_transport(e))); - break; - - case PN_CONNECTION_REMOTE_CLOSE: - check_condition(e, pn_connection_remote_condition(pn_event_connection(e))); - connection_unsub(b, pn_event_connection(e)); - pn_connection_close(pn_event_connection(e)); - break; - - case PN_SESSION_REMOTE_CLOSE: - check_condition(e, pn_session_remote_condition(pn_event_session(e))); - session_unsub(b, pn_event_session(e)); - pn_session_close(pn_event_session(e)); - pn_session_free(pn_event_session(e)); - break; - - case PN_LINK_REMOTE_CLOSE: - check_condition(e, pn_link_remote_condition(pn_event_link(e))); - link_unsub(b, pn_event_link(e)); - pn_link_close(pn_event_link(e)); - pn_link_free(pn_event_link(e)); - break; - - case PN_LISTENER_CLOSE: - check_condition(e, pn_listener_condition(pn_event_listener(e))); - break; - - case PN_PROACTOR_INACTIVE: /* listener and all connections closed */ - broker_stop(b); - break; - - case PN_PROACTOR_INTERRUPT: - b->finished = true; - break; - - default: - break; - } -} - -static void broker_thread(void *void_broker) { - broker_t *b = (broker_t*)void_broker; - do { - pn_event_batch_t *events = pn_proactor_wait(b->proactor); - pn_event_t *e; - while ((e = pn_event_batch_next(events))) { - handle(b, e); - } - pn_proactor_done(b->proactor, events); - } while(!b->finished); -} - -static void usage(const char *arg0) { - fprintf(stderr, "Usage: %s [-d] [-a url] [-t thread-count]\n", arg0); - exit(1); -} - -int main(int argc, char **argv) { - /* Command line options */ - char *urlstr = NULL; - char container_id[256]; - /* Default container-id is program:pid */ - snprintf(container_id, sizeof(container_id), "%s:%d", argv[0], getpid()); - size_t nthreads = 4; - pn_millis_t heartbeat = 0; - int opt; - while ((opt = getopt(argc, argv, "a:t:dh:c:")) != -1) { - switch (opt) { - case 'a': urlstr = optarg; break; - case 't': nthreads = atoi(optarg); break; - case 'd': enable_debug = true; break; - case 'h': heartbeat = atoi(optarg); break; - case 'c': strncpy(container_id, optarg, sizeof(container_id)); break; - default: usage(argv[0]); break; - } - } - if (optind < argc) - usage(argv[0]); - - broker_t b; - broker_init(&b, container_id, nthreads, heartbeat); - - /* Parse the URL or use default values */ - pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL; - /* Listen on IPv6 wildcard. On systems that do not set IPV6ONLY by default, - this will also listen for mapped IPv4 on the same port. - */ - const char *host = url ? pn_url_get_host(url) : "::"; - const char *port = url ? pn_url_get_port(url) : "amqp"; - pn_proactor_listen(b.proactor, pn_listener(), host, port, 16); - printf("listening on '%s:%s' %zd threads\n", host, port, b.threads); - - if (url) pn_url_free(url); - if (b.threads <= 0) { - fprintf(stderr, "invalid value -t %zu, threads must be > 0\n", b.threads); - exit(1); - } - /* Start n-1 threads and use main thread */ - uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), b.threads); - for (size_t i = 0; i < b.threads-1; ++i) { - check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create"); - } - broker_thread(&b); /* Use the main thread too. */ - for (size_t i = 0; i < b.threads-1; ++i) { - check(uv_thread_join(&threads[i]), "pthread_join"); - } - pn_proactor_free(b.proactor); - free(threads); - return 0; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/libuv_proactor.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c deleted file mode 100644 index 42bbfab..0000000 --- a/examples/c/proactor/libuv_proactor.c +++ /dev/null @@ -1,873 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <uv.h> - -#include <proton/condition.h> -#include <proton/connection_driver.h> -#include <proton/engine.h> -#include <proton/message.h> -#include <proton/object.h> -#include <proton/proactor.h> -#include <proton/transport.h> -#include <proton/url.h> - -#include <assert.h> -#include <stddef.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -/* - libuv loop functions are thread unsafe. The only exception is uv_async_send() - which is a thread safe "wakeup" that can wake the uv_loop from another thread. - - To provide concurrency the proactor uses a "leader-worker-follower" model, - threads take turns at the roles: - - - a single "leader" calls libuv functions and runs the uv_loop in short bursts - to generate work. When there is work available it gives up leadership and - becomes a "worker" - - - "workers" handle events concurrently for distinct connections/listeners - They do as much work as they can get, when none is left they become "followers" - - - "followers" wait for the leader to generate work and become workers. - When the leader itself becomes a worker, one of the followers takes over. - - This model is symmetric: any thread can take on any role based on run-time - requirements. It also allows the IO and non-IO work associated with an IO - wake-up to be processed in a single thread with no context switches. - - Function naming: - - on_ - called in leader thread via uv_run(). - - leader_ - called in leader thread, while processing the leader_q. - - owner_ - called in owning thread, leader or worker but not concurrently. - - Note on_ and leader_ functions can call each other, the prefix indicates the - path they are most often called on. -*/ - -const char *COND_NAME = "proactor"; -const char *AMQP_PORT = "5672"; -const char *AMQP_PORT_NAME = "amqp"; -const char *AMQPS_PORT = "5671"; -const char *AMQPS_PORT_NAME = "amqps"; - -PN_HANDLE(PN_PROACTOR) - -/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management. - Class definitions are for identification as pn_event_t context only. -*/ -PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) -PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) - -/* common to connection and listener */ -typedef struct psocket_t { - /* Immutable */ - pn_proactor_t *proactor; - - /* Protected by proactor.lock */ - struct psocket_t* next; - void (*wakeup)(struct psocket_t*); /* interrupting action for leader */ - - /* Only used by leader */ - uv_tcp_t tcp; - void (*action)(struct psocket_t*); /* deferred action for leader */ - bool is_conn:1; - char host[NI_MAXHOST]; - char port[NI_MAXSERV]; -} psocket_t; - -/* Special value for psocket.next pointer when socket is not on any any list. */ -psocket_t UNLISTED; - -static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) { - ps->proactor = p; - ps->next = &UNLISTED; - ps->is_conn = is_conn; - ps->tcp.data = ps; - - /* For platforms that don't know about "amqp" and "amqps" service names. */ - if (strcmp(port, AMQP_PORT_NAME) == 0) - port = AMQP_PORT; - else if (strcmp(port, AMQPS_PORT_NAME) == 0) - port = AMQPS_PORT; - /* Set to "\001" to indicate a NULL as opposed to an empty string "" */ - strncpy(ps->host, host ? host : "\001", sizeof(ps->host)); - strncpy(ps->port, port ? port : "\001", sizeof(ps->port)); -} - -/* Turn "\001" back to NULL */ -static inline const char* fixstr(const char* str) { - return str[0] == '\001' ? NULL : str; -} - -typedef struct pconnection_t { - psocket_t psocket; - - /* Only used by owner thread */ - pn_connection_driver_t driver; - - /* Only used by leader */ - uv_connect_t connect; - uv_timer_t timer; - uv_write_t write; - uv_shutdown_t shutdown; - size_t writing; - bool reading:1; - bool server:1; /* accept, not connect */ -} pconnection_t; - -struct pn_listener_t { - psocket_t psocket; - - /* Only used by owner thread */ - pconnection_t *accepting; /* accept in progress */ - pn_condition_t *condition; - pn_collector_t *collector; - pn_event_batch_t batch; - pn_record_t *attachments; - void *context; - size_t backlog; -}; - - -typedef struct queue { psocket_t *front, *back; } queue; - -struct pn_proactor_t { - /* Leader thread */ - uv_cond_t cond; - uv_loop_t loop; - uv_async_t async; - uv_timer_t timer; - - /* Owner thread: proactor collector and batch can belong to leader or a worker */ - pn_collector_t *collector; - pn_event_batch_t batch; - - /* Protected by lock */ - uv_mutex_t lock; - queue start_q; - queue worker_q; - queue leader_q; - size_t interrupt; /* pending interrupts */ - pn_millis_t timeout; - size_t count; /* psocket count */ - bool inactive:1; - bool timeout_request:1; - bool timeout_elapsed:1; - bool has_leader:1; - bool batch_working:1; /* batch belongs to a worker. */ -}; - -static bool push_lh(queue *q, psocket_t *ps) { - if (ps->next != &UNLISTED) /* Don't move if already listed. */ - return false; - ps->next = NULL; - if (!q->front) { - q->front = q->back = ps; - } else { - q->back->next = ps; - q->back = ps; - } - return true; -} - -static psocket_t* pop_lh(queue *q) { - psocket_t *ps = q->front; - if (ps) { - q->front = ps->next; - ps->next = &UNLISTED; - } - return ps; -} - -static inline pconnection_t *as_pconnection_t(psocket_t* ps) { - return ps->is_conn ? (pconnection_t*)ps : NULL; -} - -static inline pn_listener_t *as_listener(psocket_t* ps) { - return ps->is_conn ? NULL: (pn_listener_t*)ps; -} - -/* Put ps on the leader queue for processing. Thread safe. */ -static void to_leader_lh(psocket_t *ps) { - push_lh(&ps->proactor->leader_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ -} - -static void to_leader(psocket_t *ps) { - uv_mutex_lock(&ps->proactor->lock); - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -/* Detach from IO and put ps on the worker queue */ -static void leader_to_worker(psocket_t *ps) { - if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); - /* Don't detach if there are no events yet. */ - if (pn_connection_driver_has_event(&pc->driver)) { - if (pc->writing) { - pc->writing = 0; - uv_cancel((uv_req_t*)&pc->write); - } - if (pc->reading) { - pc->reading = false; - uv_read_stop((uv_stream_t*)&pc->psocket.tcp); - } - if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { - uv_timer_stop(&pc->timer); - } - } - } else { - pn_listener_t *l = as_listener(ps); - uv_read_stop((uv_stream_t*)&l->psocket.tcp); - } - uv_mutex_lock(&ps->proactor->lock); - push_lh(&ps->proactor->worker_q, ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -/* Set a deferred action for leader, if not already set. */ -static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - if (!ps->action) { - ps->action = action; - } - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -/* Owner thread send to worker thread. Set deferred action if not already set. */ -static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - if (!ps->action) { - ps->action = action; - } - push_lh(&ps->proactor->worker_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ - uv_mutex_unlock(&ps->proactor->lock); -} - - -/* Re-queue for further work */ -static void worker_requeue(psocket_t* ps) { - uv_mutex_lock(&ps->proactor->lock); - push_lh(&ps->proactor->worker_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ - uv_mutex_unlock(&ps->proactor->lock); -} - -static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) { - pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc)); - if (!pc) return NULL; - if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) { - return NULL; - } - psocket_init(&pc->psocket, p, true, host, port); - if (server) { - pn_transport_set_server(pc->driver.transport); - } - pn_record_t *r = pn_connection_attachments(pc->driver.connection); - pn_record_def(r, PN_PROACTOR, PN_VOID); - pn_record_set(r, PN_PROACTOR, pc); - return pc; -} - -static pn_event_t *listener_batch_next(pn_event_batch_t *batch); -static pn_event_t *proactor_batch_next(pn_event_batch_t *batch); - -static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) { - return (batch->next_event == proactor_batch_next) ? - (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL; -} - -static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) { - return (batch->next_event == listener_batch_next) ? - (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL; -} - -static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) { - pn_connection_driver_t *d = pn_event_batch_connection_driver(batch); - return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL; -} - -static void leader_count(pn_proactor_t *p, int change) { - uv_mutex_lock(&p->lock); - p->count += change; - p->inactive = (p->count == 0); - uv_mutex_unlock(&p->lock); -} - -/* Free if there are no uv callbacks pending and no events */ -static void leader_pconnection_t_maybe_free(pconnection_t *pc) { - if (pn_connection_driver_has_event(&pc->driver)) { - leader_to_worker(&pc->psocket); /* Return to worker */ - } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) { - /* All UV requests are finished */ - pn_connection_driver_destroy(&pc->driver); - leader_count(pc->psocket.proactor, -1); - free(pc); - } -} - -/* Free if there are no uv callbacks pending and no events */ -static void leader_listener_maybe_free(pn_listener_t *l) { - if (pn_collector_peek(l->collector)) { - leader_to_worker(&l->psocket); /* Return to worker */ - } else if (!l->psocket.tcp.data) { - pn_condition_free(l->condition); - leader_count(l->psocket.proactor, -1); - free(l); - } -} - -/* Free if there are no uv callbacks pending and no events */ -static void leader_maybe_free(psocket_t *ps) { - if (ps->is_conn) { - leader_pconnection_t_maybe_free(as_pconnection_t(ps)); - } else { - leader_listener_maybe_free(as_listener(ps)); - } -} - -static void on_close(uv_handle_t *h) { - psocket_t *ps = (psocket_t*)h->data; - h->data = NULL; /* Mark closed */ - leader_maybe_free(ps); -} - -static void on_shutdown(uv_shutdown_t *shutdown, int err) { - psocket_t *ps = (psocket_t*)shutdown->data; - shutdown->data = NULL; /* Mark closed */ - leader_maybe_free(ps); -} - -static inline void leader_close(psocket_t *ps) { - if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) { - uv_close((uv_handle_t*)&ps->tcp, on_close); - } - pconnection_t *pc = as_pconnection_t(ps); - if (pc) { - pn_connection_driver_close(&pc->driver); - if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { - uv_timer_stop(&pc->timer); - uv_close((uv_handle_t*)&pc->timer, on_close); - } - } - leader_maybe_free(ps); -} - -static pconnection_t *get_pconnection_t(pn_connection_t* c) { - if (!c) return NULL; - pn_record_t *r = pn_connection_attachments(c); - return (pconnection_t*) pn_record_get(r, PN_PROACTOR); -} - -static void leader_error(psocket_t *ps, int err, const char* what) { - if (ps->is_conn) { - pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver; - pn_connection_driver_bind(driver); /* Bind so errors will be reported */ - pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s", - what, fixstr(ps->host), fixstr(ps->port), - uv_strerror(err)); - pn_connection_driver_close(driver); - } else { - pn_listener_t *l = as_listener(ps); - pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s", - what, fixstr(ps->host), fixstr(ps->port), - uv_strerror(err)); - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); - } - leader_to_worker(ps); /* Worker to handle the error */ -} - -/* uv-initialization */ -static int leader_init(psocket_t *ps) { - leader_count(ps->proactor, +1); - int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp); - if (!err) { - pconnection_t *pc = as_pconnection_t(ps); - if (pc) { - pc->connect.data = ps; - int err = uv_timer_init(&ps->proactor->loop, &pc->timer); - if (!err) { - pc->timer.data = pc; - } - } - } - if (err) { - leader_error(ps, err, "initialization"); - } - return err; -} - -/* Common logic for on_connect and on_accept */ -static void leader_connect_accept(pconnection_t *pc, int err, const char *what) { - if (!err) { - leader_to_worker(&pc->psocket); - } else { - leader_error(&pc->psocket, err, what); - } -} - -static void on_connect(uv_connect_t *connect, int err) { - leader_connect_accept((pconnection_t*)connect->data, err, "on connect"); -} - -static void on_accept(uv_stream_t* server, int err) { - pn_listener_t *l = (pn_listener_t*) server->data; - if (err) { - leader_error(&l->psocket, err, "on accept"); - } - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); - leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */ -} - -static void leader_accept(psocket_t *ps) { - pn_listener_t * l = as_listener(ps); - pconnection_t *pc = l->accepting; - l->accepting = NULL; - if (pc) { - int err = leader_init(&pc->psocket); - if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); - leader_connect_accept(pc, err, "on accept"); - } -} - -static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) { - int err = leader_init(ps); - struct addrinfo hints = { 0 }; - if (server) hints.ai_flags = AI_PASSIVE; - if (!err) { - err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints); - } - return err; -} - -static void leader_connect(psocket_t *ps) { - pconnection_t *pc = as_pconnection_t(ps); - uv_getaddrinfo_t info; - int err = leader_resolve(ps, &info, false); - if (!err) { - err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect); - uv_freeaddrinfo(info.addrinfo); - } - if (err) { - leader_error(ps, err, "connect to"); - } -} - -static void leader_listen(psocket_t *ps) { - pn_listener_t *l = as_listener(ps); - uv_getaddrinfo_t info; - int err = leader_resolve(ps, &info, true); - if (!err) { - err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0); - uv_freeaddrinfo(info.addrinfo); - } - if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); - if (err) { - leader_error(ps, err, "listen on "); - } -} - -static void on_tick(uv_timer_t *timer) { - pconnection_t *pc = (pconnection_t*)timer->data; - pn_transport_t *t = pc->driver.transport; - if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) { - uv_timer_stop(&pc->timer); - uint64_t now = uv_now(pc->timer.loop); - uint64_t next = pn_transport_tick(t, now); - if (next) { - uv_timer_start(&pc->timer, on_tick, next - now, 0); - } - } -} - -static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - pconnection_t *pc = (pconnection_t*)stream->data; - if (nread >= 0) { - pn_connection_driver_read_done(&pc->driver, nread); - on_tick(&pc->timer); /* check for tick changes. */ - leader_to_worker(&pc->psocket); - /* Reading continues automatically until stopped. */ - } else if (nread == UV_EOF) { /* hangup */ - pn_connection_driver_read_close(&pc->driver); - leader_maybe_free(&pc->psocket); - } else { - leader_error(&pc->psocket, nread, "on read from"); - } -} - -static void on_write(uv_write_t* write, int err) { - pconnection_t *pc = (pconnection_t*)write->data; - write->data = NULL; - if (err == 0) { - pn_connection_driver_write_done(&pc->driver, pc->writing); - leader_to_worker(&pc->psocket); - } else if (err == UV_ECANCELED) { - leader_maybe_free(&pc->psocket); - } else { - leader_error(&pc->psocket, err, "on write to"); - } - pc->writing = 0; /* Need to send a new write request */ -} - -static void on_timeout(uv_timer_t *timer) { - pn_proactor_t *p = (pn_proactor_t*)timer->data; - uv_mutex_lock(&p->lock); - p->timeout_elapsed = true; - uv_mutex_unlock(&p->lock); -} - -// Read buffer allocation function for uv, just returns the transports read buffer. -static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) { - pconnection_t *pc = (pconnection_t*)stream->data; - pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); - *buf = uv_buf_init(rbuf.start, rbuf.size); -} - -static void leader_rewatch(psocket_t *ps) { - int err = 0; - if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); - if (pc->timer.data) { /* uv-initialized */ - on_tick(&pc->timer); /* Re-enable ticks if required */ - } - pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - - /* Ticks and checking buffers can generate events, process before proceeding */ - if (pn_connection_driver_has_event(&pc->driver)) { - leader_to_worker(ps); - } else { /* Re-watch for IO */ - if (wbuf.size > 0 && !pc->writing) { - pc->writing = wbuf.size; - uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); - pc->write.data = ps; - uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); - } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { - pc->shutdown.data = ps; - uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown); - } - if (rbuf.size > 0 && !pc->reading) { - pc->reading = true; - err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); - } - } - } else { - pn_listener_t *l = as_listener(ps); - err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); - } - if (err) { - leader_error(ps, err, "rewatch"); - } -} - -/* Set the event in the proactor's batch */ -static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) { - pn_collector_put(p->collector, pn_proactor__class(), p, t); - p->batch_working = true; - return &p->batch; -} - -/* Return the next event batch or 0 if no events are ready */ -static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) { - if (!p->batch_working) { /* Can generate proactor events */ - if (p->inactive) { - p->inactive = false; - return proactor_batch_lh(p, PN_PROACTOR_INACTIVE); - } - if (p->interrupt > 0) { - --p->interrupt; - return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT); - } - if (p->timeout_elapsed) { - p->timeout_elapsed = false; - return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT); - } - } - for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) { - if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); - return &pc->driver.batch; - } else { /* Listener */ - pn_listener_t *l = as_listener(ps); - return &l->batch; - } - to_leader(ps); /* No event, back to leader */ - } - return 0; -} - -/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */ -static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - ps->wakeup = action; - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -pn_listener_t *pn_event_listener(pn_event_t *e) { - return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL; -} - -pn_proactor_t *pn_event_proactor(pn_event_t *e) { - if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e); - pn_listener_t *l = pn_event_listener(e); - if (l) return l->psocket.proactor; - pn_connection_t *c = pn_event_connection(e); - if (c) return pn_connection_proactor(pn_event_connection(e)); - return NULL; -} - -void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { - pconnection_t *pc = batch_pconnection(batch); - if (pc) { - if (pn_connection_driver_has_event(&pc->driver)) { - /* Process all events before going back to IO. */ - worker_requeue(&pc->psocket); - } else if (pn_connection_driver_finished(&pc->driver)) { - owner_to_leader(&pc->psocket, leader_close); - } else { - owner_to_leader(&pc->psocket, leader_rewatch); - } - return; - } - pn_listener_t *l = batch_listener(batch); - if (l) { - owner_to_leader(&l->psocket, leader_rewatch); - return; - } - pn_proactor_t *bp = batch_proactor(batch); - if (bp == p) { - uv_mutex_lock(&p->lock); - p->batch_working = false; - uv_async_send(&p->async); /* Wake leader */ - uv_mutex_unlock(&p->lock); - return; - } -} - -/* Run follower/leader loop till we can return an event and be a worker */ -pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { - uv_mutex_lock(&p->lock); - /* Try to grab work immediately. */ - pn_event_batch_t *batch = get_batch_lh(p); - if (batch == NULL) { - /* No work available, follow the leader */ - while (p->has_leader) { - uv_cond_wait(&p->cond, &p->lock); - } - /* Lead till there is work to do. */ - p->has_leader = true; - while (batch == NULL) { - if (p->timeout_request) { - p->timeout_request = false; - if (p->timeout) { - uv_timer_start(&p->timer, on_timeout, p->timeout, 0); - } else { - uv_timer_stop(&p->timer); - } - } - for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { - void (*action)(psocket_t*) = ps->action; - void (*wakeup)(psocket_t*) = ps->wakeup; - ps->action = NULL; - ps->wakeup = NULL; - if (action || wakeup) { - uv_mutex_unlock(&p->lock); - if (action) action(ps); - if (wakeup) wakeup(ps); - uv_mutex_lock(&p->lock); - } - } - batch = get_batch_lh(p); - if (batch == NULL) { - uv_mutex_unlock(&p->lock); - uv_run(&p->loop, UV_RUN_ONCE); - uv_mutex_lock(&p->lock); - } - } - /* Signal the next leader and return to work */ - p->has_leader = false; - uv_cond_signal(&p->cond); - } - uv_mutex_unlock(&p->lock); - return batch; -} - -void pn_proactor_interrupt(pn_proactor_t *p) { - uv_mutex_lock(&p->lock); - ++p->interrupt; - uv_async_send(&p->async); /* Interrupt the UV loop */ - uv_mutex_unlock(&p->lock); -} - -void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { - uv_mutex_lock(&p->lock); - p->timeout = t; - p->timeout_request = true; - uv_async_send(&p->async); /* Interrupt the UV loop */ - uv_mutex_unlock(&p->lock); -} - -int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) { - pconnection_t *pc = new_pconnection_t(p, c, false, host, port); - if (!pc) { - return PN_OUT_OF_MEMORY; - } - /* Process PN_CONNECTION_INIT before binding */ - owner_to_worker(&pc->psocket, leader_connect); - return 0; -} - -int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog) -{ - psocket_init(&l->psocket, p, false, host, port); - l->backlog = backlog; - owner_to_leader(&l->psocket, leader_listen); - return 0; -} - -pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { - pconnection_t *pc = get_pconnection_t(c); - return pc ? pc->psocket.proactor : NULL; -} - -void leader_wake_connection(psocket_t *ps) { - pconnection_t *pc = as_pconnection_t(ps); - pn_connection_t *c = pc->driver.connection; - pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE); - leader_to_worker(ps); -} - -void pn_connection_wake(pn_connection_t* c) { - wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection); -} - -pn_proactor_t *pn_proactor() { - pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); - p->collector = pn_collector(); - p->batch.next_event = &proactor_batch_next; - if (!p->collector) return NULL; - uv_loop_init(&p->loop); - uv_mutex_init(&p->lock); - uv_cond_init(&p->cond); - uv_async_init(&p->loop, &p->async, NULL); - uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */ - p->timer.data = p; - return p; -} - -static void on_stopping(uv_handle_t* h, void* v) { - uv_close(h, NULL); /* Close this handle */ - if (!uv_loop_alive(h->loop)) /* Everything closed */ - uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */ -} - -void pn_proactor_free(pn_proactor_t *p) { - uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */ - uv_run(&p->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */ - uv_loop_close(&p->loop); - uv_mutex_destroy(&p->lock); - uv_cond_destroy(&p->cond); - pn_collector_free(p->collector); - free(p); -} - -static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { - pn_listener_t *l = batch_listener(batch); - pn_event_t *handled = pn_collector_prev(l->collector); - if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) { - owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */ - } - return pn_collector_next(l->collector); -} - -static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { - return pn_collector_next(batch_proactor(batch)->collector); -} - -static void pn_listener_free(pn_listener_t *l) { - if (l) { - if (!l->collector) pn_collector_free(l->collector); - if (!l->condition) pn_condition_free(l->condition); - if (!l->attachments) pn_free(l->attachments); - free(l); - } -} - -pn_listener_t *pn_listener() { - pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t)); - if (l) { - l->batch.next_event = listener_batch_next; - l->collector = pn_collector(); - l->condition = pn_condition(); - l->attachments = pn_record(); - if (!l->condition || !l->collector || !l->attachments) { - pn_listener_free(l); - return NULL; - } - } - return l; -} - -void pn_listener_close(pn_listener_t* l) { - wakeup(&l->psocket, leader_close); -} - -pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { - return l ? l->psocket.proactor : NULL; -} - -pn_condition_t* pn_listener_condition(pn_listener_t* l) { - return l->condition; -} - -void *pn_listener_get_context(pn_listener_t *l) { - return l->context; -} - -void pn_listener_set_context(pn_listener_t *l, void *context) { - l->context = context; -} - -pn_record_t *pn_listener_attachments(pn_listener_t *l) { - return l->attachments; -} - -int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { - if (l->accepting) { - return PN_STATE_ERR; /* Only one at a time */ - } - l->accepting = new_pconnection_t( - l->psocket.proactor, c, true, l->psocket.host, l->psocket.port); - if (!l->accepting) { - return UV_ENOMEM; - } - owner_to_leader(&l->psocket, leader_accept); - return 0; -} - http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c deleted file mode 100644 index b8edcd6..0000000 --- a/examples/c/proactor/receive.c +++ /dev/null @@ -1,205 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/connection.h> -#include <proton/connection_driver.h> -#include <proton/delivery.h> -#include <proton/proactor.h> -#include <proton/link.h> -#include <proton/message.h> -#include <proton/session.h> -#include <proton/transport.h> -#include <proton/url.h> - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> - -typedef char str[1024]; - -typedef struct app_data_t { - str address; - str container_id; - pn_rwbytes_t message_buffer; - int message_count; - int received; - pn_proactor_t *proactor; - bool finished; -} app_data_t; - -static const int BATCH = 100; /* Batch size for unlimited receive */ - -static int exit_code = 0; - -static void check_condition(pn_event_t *e, pn_condition_t *cond) { - if (pn_condition_is_set(cond)) { - exit_code = 1; - fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), - pn_condition_get_name(cond), pn_condition_get_description(cond)); - } -} - -#define MAX_SIZE 1024 - -static void decode_message(pn_delivery_t *dlv) { - static char buffer[MAX_SIZE]; - ssize_t len; - // try to decode the message body - if (pn_delivery_pending(dlv) < MAX_SIZE) { - // read in the raw data - len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); - if (len > 0) { - // decode it into a proton message - pn_message_t *m = pn_message(); - if (PN_OK == pn_message_decode(m, buffer, len)) { - pn_string_t *s = pn_string(NULL); - pn_inspect(pn_message_body(m), s); - printf("%s\n", pn_string_get(s)); - pn_free(s); - } - pn_message_free(m); - } - } -} - -static void handle(app_data_t* app, pn_event_t* event) { - switch (pn_event_type(event)) { - - case PN_CONNECTION_INIT: { - pn_connection_t* c = pn_event_connection(event); - pn_connection_set_container(c, app->container_id); - pn_connection_open(c); - pn_session_t* s = pn_session(c); - pn_session_open(s); - pn_link_t* l = pn_receiver(s, "my_receiver"); - pn_terminus_set_address(pn_link_source(l), app->address); - pn_link_open(l); - /* cannot receive without granting credit: */ - pn_link_flow(l, app->message_count ? app->message_count : BATCH); - } break; - - case PN_DELIVERY: { - /* A message has been received */ - pn_link_t *link = NULL; - pn_delivery_t *dlv = pn_event_delivery(event); - if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { - link = pn_delivery_link(dlv); - decode_message(dlv); - /* Accept the delivery */ - pn_delivery_update(dlv, PN_ACCEPTED); - /* done with the delivery, move to the next and free it */ - pn_link_advance(link); - pn_delivery_settle(dlv); /* dlv is now freed */ - - if (app->message_count == 0) { - /* receive forever - see if more credit is needed */ - if (pn_link_credit(link) < BATCH/2) { - /* Grant enough credit to bring it up to BATCH: */ - pn_link_flow(link, BATCH - pn_link_credit(link)); - } - } else if (++app->received >= app->message_count) { - /* done receiving, close the endpoints */ - printf("%d messages received\n", app->received); - pn_session_t *ssn = pn_link_session(link); - pn_link_close(link); - pn_session_close(ssn); - pn_connection_close(pn_session_connection(ssn)); - } - } - } break; - - case PN_TRANSPORT_ERROR: - check_condition(event, pn_transport_condition(pn_event_transport(event))); - break; - - case PN_CONNECTION_REMOTE_CLOSE: - check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_SESSION_REMOTE_CLOSE: - check_condition(event, pn_session_remote_condition(pn_event_session(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_LINK_REMOTE_CLOSE: - case PN_LINK_REMOTE_DETACH: - check_condition(event, pn_link_remote_condition(pn_event_link(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_PROACTOR_INACTIVE: - app->finished = true; - break; - - default: break; - } -} - -static void usage(const char *arg0) { - fprintf(stderr, "Usage: %s [-a url] [-m message-count]\n", arg0); - exit(1); -} - -int main(int argc, char **argv) { - /* Default values for application and connection. */ - app_data_t app = {{0}}; - app.message_count = 100; - const char* urlstr = NULL; - - int opt; - while((opt = getopt(argc, argv, "a:m:")) != -1) { - switch(opt) { - case 'a': urlstr = optarg; break; - case 'm': app.message_count = atoi(optarg); break; - default: usage(argv[0]); break; - } - } - if (optind < argc) - usage(argv[0]); - - snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid()); - - /* Parse the URL or use default values */ - pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL; - const char *host = url ? pn_url_get_host(url) : NULL; - const char *port = url ? pn_url_get_port(url) : "amqp"; - strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address)); - - /* Create the proactor and connect */ - app.proactor = pn_proactor(); - pn_proactor_connect(app.proactor, pn_connection(), host, port); - if (url) pn_url_free(url); - - do { - pn_event_batch_t *events = pn_proactor_wait(app.proactor); - pn_event_t *e; - while ((e = pn_event_batch_next(events))) { - handle(&app, e); - } - pn_proactor_done(app.proactor, events); - } while(!app.finished); - - pn_proactor_free(app.proactor); - free(app.message_buffer.start); - return exit_code; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/send.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c deleted file mode 100644 index d611b3d..0000000 --- a/examples/c/proactor/send.c +++ /dev/null @@ -1,234 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/connection.h> -#include <proton/connection_driver.h> -#include <proton/delivery.h> -#include <proton/proactor.h> -#include <proton/link.h> -#include <proton/message.h> -#include <proton/session.h> -#include <proton/transport.h> -#include <proton/url.h> - -#include <stdio.h> -#include <stdlib.h> -#include <string.h> -#include <unistd.h> - -typedef char str[1024]; - -typedef struct app_data_t { - str address; - str container_id; - pn_rwbytes_t message_buffer; - int message_count; - int sent; - int acknowledged; - pn_proactor_t *proactor; - pn_millis_t delay; - bool delaying; - pn_link_t *sender; - bool finished; -} app_data_t; - -int exit_code = 0; - -static void check_condition(pn_event_t *e, pn_condition_t *cond) { - if (pn_condition_is_set(cond)) { - exit_code = 1; - fprintf(stderr, "%s: %s: %s\n", pn_event_type_name(pn_event_type(e)), - pn_condition_get_name(cond), pn_condition_get_description(cond)); - } -} - -/* Create a message with a map { "sequence" : number } encode it and return the encoded buffer. */ -static pn_bytes_t encode_message(app_data_t* app) { - /* Construct a message with the map { "sequence": app.sent } */ - pn_message_t* message = pn_message(); - pn_data_put_int(pn_message_id(message), app->sent); /* Set the message_id also */ - pn_data_t* body = pn_message_body(message); - pn_data_put_map(body); - pn_data_enter(body); - pn_data_put_string(body, pn_bytes(sizeof("sequence")-1, "sequence")); - pn_data_put_int(body, app->sent); /* The sequence number */ - pn_data_exit(body); - - /* encode the message, expanding the encode buffer as needed */ - if (app->message_buffer.start == NULL) { - static const size_t initial_size = 128; - app->message_buffer = pn_rwbytes(initial_size, (char*)malloc(initial_size)); - } - /* app->message_buffer is the total buffer space available. */ - /* mbuf wil point at just the portion used by the encoded message */ - pn_rwbytes_t mbuf = pn_rwbytes(app->message_buffer.size, app->message_buffer.start); - int status = 0; - while ((status = pn_message_encode(message, mbuf.start, &mbuf.size)) == PN_OVERFLOW) { - app->message_buffer.size *= 2; - app->message_buffer.start = (char*)realloc(app->message_buffer.start, app->message_buffer.size); - mbuf.size = app->message_buffer.size; - } - if (status != 0) { - fprintf(stderr, "error encoding message: %s\n", pn_error_text(pn_message_error(message))); - exit(1); - } - pn_message_free(message); - return pn_bytes(mbuf.size, mbuf.start); -} - -static void send(app_data_t* app) { - while (pn_link_credit(app->sender) > 0 && app->sent < app->message_count) { - ++app->sent; - // Use sent counter bytes as unique delivery tag. - pn_delivery(app->sender, pn_dtag((const char *)&app->sent, sizeof(app->sent))); - pn_bytes_t msgbuf = encode_message(app); - pn_link_send(app->sender, msgbuf.start, msgbuf.size); - pn_link_advance(app->sender); - if (app->delay && app->sent < app->message_count) { - /* If delay is set, wait for TIMEOUT event to send more */ - app->delaying = true; - pn_proactor_set_timeout(app->proactor, app->delay); - break; - } - } -} - -static void handle(app_data_t* app, pn_event_t* event) { - switch (pn_event_type(event)) { - - case PN_CONNECTION_INIT: { - pn_connection_t* c = pn_event_connection(event); - pn_connection_set_container(c, app->container_id); - pn_connection_open(c); - pn_session_t* s = pn_session(c); - pn_session_open(s); - pn_link_t* l = pn_sender(s, "my_sender"); - pn_terminus_set_address(pn_link_target(l), app->address); - pn_link_open(l); - } break; - - case PN_LINK_FLOW: - /* The peer has given us some credit, now we can send messages */ - if (!app->delaying) { - app->sender = pn_event_link(event); - send(app); - } - break; - - case PN_PROACTOR_TIMEOUT: - /* Wake the sender's connection */ - pn_connection_wake(pn_session_connection(pn_link_session(app->sender))); - break; - - case PN_CONNECTION_WAKE: - /* Timeout, we can send more. */ - app->delaying = false; - send(app); - break; - - case PN_DELIVERY: { - /* We received acknowledgedment from the peer that a message was delivered. */ - pn_delivery_t* d = pn_event_delivery(event); - if (pn_delivery_remote_state(d) == PN_ACCEPTED) { - if (++app->acknowledged == app->message_count) { - printf("%d messages sent and acknowledged\n", app->acknowledged); - pn_connection_close(pn_event_connection(event)); - } - } - } break; - - case PN_TRANSPORT_CLOSED: - check_condition(event, pn_transport_condition(pn_event_transport(event))); - break; - - case PN_CONNECTION_REMOTE_CLOSE: - check_condition(event, pn_connection_remote_condition(pn_event_connection(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_SESSION_REMOTE_CLOSE: - check_condition(event, pn_session_remote_condition(pn_event_session(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_LINK_REMOTE_CLOSE: - case PN_LINK_REMOTE_DETACH: - check_condition(event, pn_link_remote_condition(pn_event_link(event))); - pn_connection_close(pn_event_connection(event)); - break; - - case PN_PROACTOR_INACTIVE: - app->finished = true; - break; - - default: break; - } -} - -static void usage(const char *arg0) { - fprintf(stderr, "Usage: %s [-a url] [-m message-count] [-d delay-ms]\n", arg0); - exit(1); -} - -int main(int argc, char **argv) { - /* Default values for application and connection. */ - app_data_t app = {{0}}; - app.message_count = 100; - const char* urlstr = NULL; - - int opt; - while((opt = getopt(argc, argv, "a:m:d:")) != -1) { - switch(opt) { - case 'a': urlstr = optarg; break; - case 'm': app.message_count = atoi(optarg); break; - case 'd': app.delay = atoi(optarg); break; - default: usage(argv[0]); break; - } - } - if (optind < argc) - usage(argv[0]); - - snprintf(app.container_id, sizeof(app.container_id), "%s:%d", argv[0], getpid()); - - /* Parse the URL or use default values */ - pn_url_t *url = urlstr ? pn_url_parse(urlstr) : NULL; - const char *host = url ? pn_url_get_host(url) : NULL; - const char *port = url ? pn_url_get_port(url) : "amqp"; - strncpy(app.address, (url && pn_url_get_path(url)) ? pn_url_get_path(url) : "example", sizeof(app.address)); - - /* Create the proactor and connect */ - app.proactor = pn_proactor(); - pn_proactor_connect(app.proactor, pn_connection(), host, port); - if (url) pn_url_free(url); - - do { - pn_event_batch_t *events = pn_proactor_wait(app.proactor); - pn_event_t *e; - while ((e = pn_event_batch_next(events))) { - handle(&app, e); - } - pn_proactor_done(app.proactor, events); - } while(!app.finished); - - pn_proactor_free(app.proactor); - free(app.message_buffer.start); - return exit_code; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/proactor/test.py ---------------------------------------------------------------------- diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py deleted file mode 100644 index a86425d..0000000 --- a/examples/c/proactor/test.py +++ /dev/null @@ -1,60 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License -# - -# This is a test script to run the examples and verify that they behave as expected. - -from exampletest import * - -import unittest -import sys - -def python_cmd(name): - dir = os.path.dirname(__file__) - return [sys.executable, os.path.join(dir, "..", "..", "python", name)] - -def receive_expect(n): - return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n - -class CExampleTest(BrokerTestCase): - broker_exe = ["libuv_broker"] - - def test_send_receive(self): - """Send first then receive""" - s = self.proc(["libuv_send", "-a", self.addr]) - self.assertEqual("100 messages sent and acknowledged\n", s.wait_out()) - r = self.proc(["libuv_receive", "-a", self.addr]) - self.assertEqual(receive_expect(100), r.wait_out()) - - def test_receive_send(self): - """Start receiving first, then send.""" - r = self.proc(["libuv_receive", "-a", self.addr]); - s = self.proc(["libuv_send", "-a", self.addr]); - self.assertEqual("100 messages sent and acknowledged\n", s.wait_out()) - self.assertEqual(receive_expect(100), r.wait_out()) - - def test_timed_send(self): - """Send with timed delay""" - s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"]) - self.assertEqual("3 messages sent and acknowledged\n", s.wait_out()) - r = self.proc(["libuv_receive", "-a", self.addr, "-m3"]) - self.assertEqual(receive_expect(3), r.wait_out()) - - -if __name__ == "__main__": - unittest.main() http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/reactor/CMakeLists.txt b/examples/c/reactor/CMakeLists.txt deleted file mode 100644 index bd6163f..0000000 --- a/examples/c/reactor/CMakeLists.txt +++ /dev/null @@ -1,45 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. -# - -find_package(Proton REQUIRED) - -set (reactor-examples - sender.c - receiver.c - ) - -set_source_files_properties ( - ${reactor-examples} - PROPERTIES - COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${COMPILE_LANGUAGE_FLAGS} ${LINK_TIME_OPTIMIZATION}" - ) - -if (BUILD_WITH_CXX) - set_source_files_properties ( - ${reactor-examples} - PROPERTIES LANGUAGE CXX - ) -endif (BUILD_WITH_CXX) - -include_directories(${Proton_INCLUDE_DIRS}) -add_executable(sender sender.c) -add_executable(receiver receiver.c) -target_link_libraries(sender ${Proton_LIBRARIES}) -target_link_libraries(receiver ${Proton_LIBRARIES}) - http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/README ---------------------------------------------------------------------- diff --git a/examples/c/reactor/README b/examples/c/reactor/README deleted file mode 100644 index 8d61893..0000000 --- a/examples/c/reactor/README +++ /dev/null @@ -1,30 +0,0 @@ -These example clients require a broker or similar intermediary that -supports the AMQP 1.0 protocol, allows anonymous connections and -accepts links to and from a node named 'examples'. - ------------------------------------------------------------------- - -sender.c - -A simple message sending client. This example sends all messages but -the last as pre-settled (no ack required). It then pends waiting for -an ack for the last message sent before exiting. - -Use the '-h' command line option for a list of supported parameters. - ------------------------------------------------------------------- - -receiver.c - -A simple message consuming client. This example receives messages -from a target (default 'examples'). Received messages are -acknowledged if they are sent un-settled. The client will try to -decode the message payload assuming it has been generated by the -sender example. - -Use the '-h' command line option for a list of supported parameters. - - - - - http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/examples/c/reactor/receiver.c ---------------------------------------------------------------------- diff --git a/examples/c/reactor/receiver.c b/examples/c/reactor/receiver.c deleted file mode 100644 index 35c5a70..0000000 --- a/examples/c/reactor/receiver.c +++ /dev/null @@ -1,286 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <stdlib.h> -#include <stdio.h> -#include <string.h> - -#include "pncompat/misc_funcs.inc" - -#include "proton/reactor.h" -#include "proton/message.h" -#include "proton/connection.h" -#include "proton/session.h" -#include "proton/link.h" -#include "proton/delivery.h" -#include "proton/event.h" -#include "proton/handlers.h" -#include "proton/transport.h" -#include "proton/url.h" - -static int quiet = 0; - -// Credit batch if unlimited receive (-c 0) -static const int CAPACITY = 100; -#define MAX_SIZE 512 - -// Example application data. This data will be instantiated in the event -// handler, and is available during event processing. In this example it -// holds configuration and state information. -// -typedef struct { - int count; // # of messages to receive before exiting - const char *source; // name of the source node to receive from - pn_message_t *message; // holds the received message -} app_data_t; - -// helper to pull pointer to app_data_t instance out of the pn_handler_t -// -#define GET_APP_DATA(handler) ((app_data_t *)pn_handler_mem(handler)) - -// Called when reactor exits to clean up app_data -// -static void delete_handler(pn_handler_t *handler) -{ - app_data_t *d = GET_APP_DATA(handler); - if (d->message) { - pn_decref(d->message); - d->message = NULL; - } -} - - -/* Process each event posted by the reactor. - */ -static void event_handler(pn_handler_t *handler, - pn_event_t *event, - pn_event_type_t type) -{ - app_data_t *data = GET_APP_DATA(handler); - - switch (type) { - - case PN_CONNECTION_INIT: { - // Create and open all the endpoints needed to send a message - // - pn_connection_t *conn; - pn_session_t *ssn; - pn_link_t *receiver; - - conn = pn_event_connection(event); - pn_connection_open(conn); - ssn = pn_session(conn); - pn_session_open(ssn); - receiver = pn_receiver(ssn, "MyReceiver"); - pn_terminus_set_address(pn_link_source(receiver), data->source); - pn_link_open(receiver); - // cannot receive without granting credit: - pn_link_flow(receiver, data->count ? data->count : CAPACITY); - } break; - - case PN_DELIVERY: { - // A message has been received - // - pn_link_t *link = NULL; - pn_delivery_t *dlv = pn_event_delivery(event); - if (pn_delivery_readable(dlv) && !pn_delivery_partial(dlv)) { - // A full message has arrived - if (!quiet) { - static char buffer[MAX_SIZE]; - ssize_t len; - pn_bytes_t bytes; - bool found = false; - - // try to decode the message body - if (pn_delivery_pending(dlv) < MAX_SIZE) { - // read in the raw data - len = pn_link_recv(pn_delivery_link(dlv), buffer, MAX_SIZE); - if (len > 0) { - // decode it into a proton message - pn_message_clear(data->message); - if (PN_OK == pn_message_decode(data->message, buffer, - len)) { - // Assuming the message came from the sender - // example, try to parse out a single string from - // the payload - // - pn_data_scan(pn_message_body(data->message), "?S", - &found, &bytes); - } - } - } - if (found) { - fprintf(stdout, "Message: [%.*s]\n", (int)bytes.size, - bytes.start); - } else { - fprintf(stdout, "Message received!\n"); - } - } - - link = pn_delivery_link(dlv); - - if (!pn_delivery_settled(dlv)) { - // remote has not settled, so it is tracking the delivery. Ack - // it. - pn_delivery_update(dlv, PN_ACCEPTED); - } - - // done with the delivery, move to the next and free it - pn_link_advance(link); - pn_delivery_settle(dlv); // dlv is now freed - - if (data->count == 0) { - // send forever - see if more credit is needed - if (pn_link_credit(link) < CAPACITY/2) { - // Grant enough credit to bring it up to CAPACITY: - pn_link_flow(link, CAPACITY - pn_link_credit(link)); - } - } else if (--data->count == 0) { - // done receiving, close the endpoints - pn_session_t *ssn = pn_link_session(link); - pn_link_close(link); - pn_session_close(ssn); - pn_connection_close(pn_session_connection(ssn)); - } - } - } break; - - case PN_TRANSPORT_ERROR: { - // The connection to the peer failed. - // - pn_transport_t *tport = pn_event_transport(event); - pn_condition_t *cond = pn_transport_condition(tport); - fprintf(stderr, "Network transport failed!\n"); - if (pn_condition_is_set(cond)) { - const char *name = pn_condition_get_name(cond); - const char *desc = pn_condition_get_description(cond); - fprintf(stderr, " Error: %s Description: %s\n", - (name) ? name : "<error name not provided>", - (desc) ? desc : "<no description provided>"); - } - // pn_reactor_process() will exit with a false return value, stopping - // the main loop. - } break; - - default: - break; - } -} - -static void usage(void) -{ - printf("Usage: receiver <options>\n"); - printf("-a \tThe host address [localhost:5672]\n"); - printf("-c \t# of messages to receive, 0=receive forever [1]\n"); - printf("-s \tSource address [examples]\n"); - printf("-i \tContainer name [ReceiveExample]\n"); - printf("-q \tQuiet - turn off stdout\n"); - exit(1); -} - -int main(int argc, char** argv) -{ - const char *address = "localhost"; - const char *container = "ReceiveExample"; - int c; - pn_reactor_t *reactor = NULL; - pn_url_t *url = NULL; - pn_connection_t *conn = NULL; - - /* create a handler for the connection's events. - * event_handler will be called for each event. The handler will allocate - * a app_data_t instance which can be accessed when the event_handler is - * called. - */ - pn_handler_t *handler = pn_handler_new(event_handler, - sizeof(app_data_t), - delete_handler); - - /* set up the application data with defaults */ - app_data_t *app_data = GET_APP_DATA(handler); - memset(app_data, 0, sizeof(app_data_t)); - app_data->count = 1; - app_data->source = "examples"; - app_data->message = pn_message(); - - /* Attach the pn_handshaker() handler. This handler deals with endpoint - * events from the peer so we don't have to. - */ - { - pn_handler_t *handshaker = pn_handshaker(); - pn_handler_add(handler, handshaker); - pn_decref(handshaker); - } - - /* command line options */ - opterr = 0; - while((c = getopt(argc, argv, "i:a:c:s:qh")) != -1) { - switch(c) { - case 'h': usage(); break; - case 'a': address = optarg; break; - case 'c': - app_data->count = atoi(optarg); - if (app_data->count < 0) usage(); - break; - case 's': app_data->source = optarg; break; - case 'i': container = optarg; break; - case 'q': quiet = 1; break; - default: - usage(); - break; - } - } - - reactor = pn_reactor(); - - url = pn_url_parse(address); - if (url == NULL) { - fprintf(stderr, "Invalid host address %s\n", address); - exit(1); - } - conn = pn_reactor_connection_to_host(reactor, - pn_url_get_host(url), - pn_url_get_port(url), - handler); - pn_decref(url); - pn_decref(handler); - - // the container name should be unique for each client - pn_connection_set_container(conn, container); - - // wait up to 5 seconds for activity before returning from - // pn_reactor_process() - pn_reactor_set_timeout(reactor, 5000); - - pn_reactor_start(reactor); - - while (pn_reactor_process(reactor)) { - /* Returns 'true' until the connection is shut down. - * pn_reactor_process() will return true at least once every 5 seconds - * (due to the timeout). If no timeout was configured, - * pn_reactor_process() returns as soon as it finishes processing all - * pending I/O and events. Once the connection has closed, - * pn_reactor_process() will return false. - */ - } - pn_decref(reactor); - - return 0; -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
