Repository: qpid-proton Updated Branches: refs/heads/master b9a57e8a7 -> ec70d73dd
PROTON-1403: c proactor library Move the libuv example proactor into an installed library. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/afacb165 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/afacb165 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/afacb165 Branch: refs/heads/master Commit: afacb16527e9f231ae76d5e16ca0d9ac7edcff86 Parents: b9a57e8 Author: Alan Conway <[email protected]> Authored: Fri Feb 10 21:43:05 2017 -0500 Committer: Alan Conway <[email protected]> Committed: Fri Feb 10 21:49:32 2017 -0500 ---------------------------------------------------------------------- examples/c/proactor/CMakeLists.txt | 33 +- examples/c/proactor/broker.c | 44 +- examples/c/proactor/libuv_proactor.c | 873 ------------------------ examples/c/proactor/test.py | 14 +- examples/c/proactor/thread.h | 49 ++ proton-c/CMakeLists.txt | 74 +- proton-c/include/proton/import_export.h | 7 + proton-c/include/proton/listener.h | 4 +- proton-c/include/proton/proactor.h | 26 +- proton-c/include/proton/types.h | 17 +- proton-c/src/libqpid-proton-proactor.pc.in | 30 + proton-c/src/proactor/libuv.c | 873 ++++++++++++++++++++++++ 12 files changed, 1074 insertions(+), 970 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/c/proactor/CMakeLists.txt b/examples/c/proactor/CMakeLists.txt index f701651..2ed4f94 100644 --- a/examples/c/proactor/CMakeLists.txt +++ b/examples/c/proactor/CMakeLists.txt @@ -23,21 +23,20 @@ include_directories(${CMAKE_CURRENT_SOURCE_DIR} ${Proton_INCLUDE_DIRS}) add_definitions(${COMPILE_WARNING_FLAGS} ${WERROR} ${COMPILE_PLATFORM_FLAGS} ${LINK_TIME_OPTIMIZATION}) -find_package(Libuv) -if (Libuv_FOUND) - foreach(name broker send receive) - add_executable(libuv_${name} ${name}.c libuv_proactor.c) - target_link_libraries(libuv_${name} ${Proton_LIBRARIES} ${Libuv_LIBRARIES}) - set_target_properties(libuv_${name} PROPERTIES - COMPILE_DEFINITIONS "PN_PROACTOR_INCLUDE=\"libuv_proactor.h\"") - endforeach() +# Add a test with the correct environment to find test executables and valgrind. +if(WIN32) + set(test_path "$<TARGET_FILE_DIR:broker>;$<TARGET_FILE_DIR:qpid-proton>") +else(WIN32) + set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}") + set(PLATFORM_LIBS pthread) +endif(WIN32) + +foreach(name broker send receive) + add_executable(proactor-${name} ${name}.c) + target_link_libraries(proactor-${name} ${Proton_LIBRARIES} ${PLATFORM_LIBS}) + set_target_properties(proactor-${name} PROPERTIES OUTPUT_NAME ${name}) +endforeach() + +set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV}) +add_test(c-proactor ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py -v) - # Add a test with the correct environment to find test executables and valgrind. - if(WIN32) - set(test_path "$<TARGET_FILE_DIR:libuv_broker>;$<TARGET_FILE_DIR:qpid-proton>") - else(WIN32) - set(test_path "${CMAKE_CURRENT_BINARY_DIR}:$ENV{PATH}") - endif(WIN32) - set(run_env ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py ${EXAMPLE_ENV} "PATH=${test_path}" ${VALGRIND_ENV}) - add_test(c-proactor-libuv ${run_env} -- ${PYTHON_EXECUTABLE} ${CMAKE_CURRENT_SOURCE_DIR}/test.py) -endif() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c index ca52336..d6261f4 100644 --- a/examples/c/proactor/broker.c +++ b/examples/c/proactor/broker.c @@ -17,6 +17,8 @@ * under the License. */ +#include "thread.h" + #include <proton/connection_driver.h> #include <proton/proactor.h> #include <proton/engine.h> @@ -29,11 +31,6 @@ #include <string.h> #include <unistd.h> -/* TODO aconway 2016-10-14: this example does not require libuv IO, - it uses uv.h only for portable mutex and thread functions. -*/ -#include <uv.h> - bool enable_debug = false; void debug(const char* fmt, ...) { @@ -91,7 +88,7 @@ void pcheck(int err, const char* s) { /* Simple thread-safe queue implementation */ typedef struct queue_t { - uv_mutex_t lock; + pthread_mutex_t lock; char* name; VEC(pn_rwbytes_t) messages; /* Messages on the queue_t */ VEC(pn_connection_t*) waiting; /* Connections waiting to send messages from this queue */ @@ -101,7 +98,7 @@ typedef struct queue_t { static void queue_init(queue_t *q, const char* name, queue_t *next) { debug("created queue %s", name); - uv_mutex_init(&q->lock); + pthread_mutex_init(&q->lock, NULL); q->name = strdup(name); VEC_INIT(q->messages); VEC_INIT(q->waiting); @@ -110,7 +107,7 @@ static void queue_init(queue_t *q, const char* name, queue_t *next) { } static void queue_destroy(queue_t *q) { - uv_mutex_destroy(&q->lock); + pthread_mutex_destroy(&q->lock); free(q->name); for (size_t i = 0; i < q->messages.len; ++i) free(q->messages.data[i].start); @@ -126,7 +123,7 @@ static void queue_destroy(queue_t *q) { static void queue_send(queue_t *q, pn_link_t *s) { pn_rwbytes_t m = { 0 }; size_t tag = 0; - uv_mutex_lock(&q->lock); + pthread_mutex_lock(&q->lock); if (q->messages.len == 0) { /* Empty, record connection as waiting */ debug("queue is empty %s", q->name); /* Record connection for wake-up if not already on the list. */ @@ -143,7 +140,7 @@ static void queue_send(queue_t *q, pn_link_t *s) { VEC_POP(q->messages); tag = ++q->sent; } - uv_mutex_unlock(&q->lock); + pthread_mutex_unlock(&q->lock); if (m.start) { pn_delivery_t *d = pn_delivery(s, pn_dtag((char*)&tag, sizeof(tag))); pn_link_send(s, m.start, m.size); @@ -172,7 +169,7 @@ bool pn_connection_get_check_queues(pn_connection_t *c) { */ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) { debug("received to queue %s", q->name); - uv_mutex_lock(&q->lock); + pthread_mutex_lock(&q->lock); VEC_PUSH(q->messages, m); if (q->messages.len == 1) { /* Was empty, notify waiting connections */ for (size_t i = 0; i < q->waiting.len; ++i) { @@ -182,18 +179,18 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) { } q->waiting.len = 0; } - uv_mutex_unlock(&q->lock); + pthread_mutex_unlock(&q->lock); } /* Thread safe set of queues */ typedef struct queues_t { - uv_mutex_t lock; + pthread_mutex_t lock; queue_t *queues; size_t sent; } queues_t; void queues_init(queues_t *qs) { - uv_mutex_init(&qs->lock); + pthread_mutex_init(&qs->lock, NULL); qs->queues = NULL; } @@ -202,12 +199,12 @@ void queues_destroy(queues_t *qs) { queue_destroy(q); free(q); } - uv_mutex_destroy(&qs->lock); + pthread_mutex_destroy(&qs->lock); } /** Get or create the named queue. */ queue_t* queues_get(queues_t *qs, const char* name) { - uv_mutex_lock(&qs->lock); + pthread_mutex_lock(&qs->lock); queue_t *q; for (q = qs->queues; q && strcmp(q->name, name) != 0; q = q->next) ; @@ -216,7 +213,7 @@ queue_t* queues_get(queues_t *qs, const char* name) { queue_init(q, name, qs->queues); qs->queues = q; } - uv_mutex_unlock(&qs->lock); + pthread_mutex_unlock(&qs->lock); return q; } @@ -255,7 +252,7 @@ static void link_send(broker_t *b, pn_link_t *s) { } static void queue_unsub(queue_t *q, pn_connection_t *c) { - uv_mutex_lock(&q->lock); + pthread_mutex_lock(&q->lock); for (size_t i = 0; i < q->waiting.len; ++i) { if (q->waiting.data[i] == c){ q->waiting.data[i] = q->waiting.data[0]; /* save old [0] */ @@ -263,7 +260,7 @@ static void queue_unsub(queue_t *q, pn_connection_t *c) { break; } } - uv_mutex_unlock(&q->lock); + pthread_mutex_unlock(&q->lock); } /* Unsubscribe from the queue of interest to this link. */ @@ -416,7 +413,7 @@ static void handle(broker_t* b, pn_event_t* e) { } } -static void broker_thread(void *void_broker) { +static void* broker_thread(void *void_broker) { broker_t *b = (broker_t*)void_broker; do { pn_event_batch_t *events = pn_proactor_wait(b->proactor); @@ -426,6 +423,7 @@ static void broker_thread(void *void_broker) { } pn_proactor_done(b->proactor, events); } while(!b->finished); + return NULL; } static void usage(const char *arg0) { @@ -474,13 +472,13 @@ int main(int argc, char **argv) { exit(1); } /* Start n-1 threads and use main thread */ - uv_thread_t* threads = (uv_thread_t*)calloc(sizeof(uv_thread_t), b.threads); + pthread_t* threads = (pthread_t*)calloc(sizeof(pthread_t), b.threads); for (size_t i = 0; i < b.threads-1; ++i) { - check(uv_thread_create(&threads[i], broker_thread, &b), "pthread_create"); + check(pthread_create(&threads[i], NULL, broker_thread, &b), "pthread_create"); } broker_thread(&b); /* Use the main thread too. */ for (size_t i = 0; i < b.threads-1; ++i) { - check(uv_thread_join(&threads[i]), "pthread_join"); + check(pthread_join(threads[i], NULL), "pthread_join"); } pn_proactor_free(b.proactor); free(threads); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/libuv_proactor.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c deleted file mode 100644 index 42bbfab..0000000 --- a/examples/c/proactor/libuv_proactor.c +++ /dev/null @@ -1,873 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <uv.h> - -#include <proton/condition.h> -#include <proton/connection_driver.h> -#include <proton/engine.h> -#include <proton/message.h> -#include <proton/object.h> -#include <proton/proactor.h> -#include <proton/transport.h> -#include <proton/url.h> - -#include <assert.h> -#include <stddef.h> -#include <stdio.h> -#include <stdlib.h> -#include <string.h> - -/* - libuv loop functions are thread unsafe. The only exception is uv_async_send() - which is a thread safe "wakeup" that can wake the uv_loop from another thread. - - To provide concurrency the proactor uses a "leader-worker-follower" model, - threads take turns at the roles: - - - a single "leader" calls libuv functions and runs the uv_loop in short bursts - to generate work. When there is work available it gives up leadership and - becomes a "worker" - - - "workers" handle events concurrently for distinct connections/listeners - They do as much work as they can get, when none is left they become "followers" - - - "followers" wait for the leader to generate work and become workers. - When the leader itself becomes a worker, one of the followers takes over. - - This model is symmetric: any thread can take on any role based on run-time - requirements. It also allows the IO and non-IO work associated with an IO - wake-up to be processed in a single thread with no context switches. - - Function naming: - - on_ - called in leader thread via uv_run(). - - leader_ - called in leader thread, while processing the leader_q. - - owner_ - called in owning thread, leader or worker but not concurrently. - - Note on_ and leader_ functions can call each other, the prefix indicates the - path they are most often called on. -*/ - -const char *COND_NAME = "proactor"; -const char *AMQP_PORT = "5672"; -const char *AMQP_PORT_NAME = "amqp"; -const char *AMQPS_PORT = "5671"; -const char *AMQPS_PORT_NAME = "amqps"; - -PN_HANDLE(PN_PROACTOR) - -/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management. - Class definitions are for identification as pn_event_t context only. -*/ -PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) -PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) - -/* common to connection and listener */ -typedef struct psocket_t { - /* Immutable */ - pn_proactor_t *proactor; - - /* Protected by proactor.lock */ - struct psocket_t* next; - void (*wakeup)(struct psocket_t*); /* interrupting action for leader */ - - /* Only used by leader */ - uv_tcp_t tcp; - void (*action)(struct psocket_t*); /* deferred action for leader */ - bool is_conn:1; - char host[NI_MAXHOST]; - char port[NI_MAXSERV]; -} psocket_t; - -/* Special value for psocket.next pointer when socket is not on any any list. */ -psocket_t UNLISTED; - -static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) { - ps->proactor = p; - ps->next = &UNLISTED; - ps->is_conn = is_conn; - ps->tcp.data = ps; - - /* For platforms that don't know about "amqp" and "amqps" service names. */ - if (strcmp(port, AMQP_PORT_NAME) == 0) - port = AMQP_PORT; - else if (strcmp(port, AMQPS_PORT_NAME) == 0) - port = AMQPS_PORT; - /* Set to "\001" to indicate a NULL as opposed to an empty string "" */ - strncpy(ps->host, host ? host : "\001", sizeof(ps->host)); - strncpy(ps->port, port ? port : "\001", sizeof(ps->port)); -} - -/* Turn "\001" back to NULL */ -static inline const char* fixstr(const char* str) { - return str[0] == '\001' ? NULL : str; -} - -typedef struct pconnection_t { - psocket_t psocket; - - /* Only used by owner thread */ - pn_connection_driver_t driver; - - /* Only used by leader */ - uv_connect_t connect; - uv_timer_t timer; - uv_write_t write; - uv_shutdown_t shutdown; - size_t writing; - bool reading:1; - bool server:1; /* accept, not connect */ -} pconnection_t; - -struct pn_listener_t { - psocket_t psocket; - - /* Only used by owner thread */ - pconnection_t *accepting; /* accept in progress */ - pn_condition_t *condition; - pn_collector_t *collector; - pn_event_batch_t batch; - pn_record_t *attachments; - void *context; - size_t backlog; -}; - - -typedef struct queue { psocket_t *front, *back; } queue; - -struct pn_proactor_t { - /* Leader thread */ - uv_cond_t cond; - uv_loop_t loop; - uv_async_t async; - uv_timer_t timer; - - /* Owner thread: proactor collector and batch can belong to leader or a worker */ - pn_collector_t *collector; - pn_event_batch_t batch; - - /* Protected by lock */ - uv_mutex_t lock; - queue start_q; - queue worker_q; - queue leader_q; - size_t interrupt; /* pending interrupts */ - pn_millis_t timeout; - size_t count; /* psocket count */ - bool inactive:1; - bool timeout_request:1; - bool timeout_elapsed:1; - bool has_leader:1; - bool batch_working:1; /* batch belongs to a worker. */ -}; - -static bool push_lh(queue *q, psocket_t *ps) { - if (ps->next != &UNLISTED) /* Don't move if already listed. */ - return false; - ps->next = NULL; - if (!q->front) { - q->front = q->back = ps; - } else { - q->back->next = ps; - q->back = ps; - } - return true; -} - -static psocket_t* pop_lh(queue *q) { - psocket_t *ps = q->front; - if (ps) { - q->front = ps->next; - ps->next = &UNLISTED; - } - return ps; -} - -static inline pconnection_t *as_pconnection_t(psocket_t* ps) { - return ps->is_conn ? (pconnection_t*)ps : NULL; -} - -static inline pn_listener_t *as_listener(psocket_t* ps) { - return ps->is_conn ? NULL: (pn_listener_t*)ps; -} - -/* Put ps on the leader queue for processing. Thread safe. */ -static void to_leader_lh(psocket_t *ps) { - push_lh(&ps->proactor->leader_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ -} - -static void to_leader(psocket_t *ps) { - uv_mutex_lock(&ps->proactor->lock); - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -/* Detach from IO and put ps on the worker queue */ -static void leader_to_worker(psocket_t *ps) { - if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); - /* Don't detach if there are no events yet. */ - if (pn_connection_driver_has_event(&pc->driver)) { - if (pc->writing) { - pc->writing = 0; - uv_cancel((uv_req_t*)&pc->write); - } - if (pc->reading) { - pc->reading = false; - uv_read_stop((uv_stream_t*)&pc->psocket.tcp); - } - if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { - uv_timer_stop(&pc->timer); - } - } - } else { - pn_listener_t *l = as_listener(ps); - uv_read_stop((uv_stream_t*)&l->psocket.tcp); - } - uv_mutex_lock(&ps->proactor->lock); - push_lh(&ps->proactor->worker_q, ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -/* Set a deferred action for leader, if not already set. */ -static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - if (!ps->action) { - ps->action = action; - } - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -/* Owner thread send to worker thread. Set deferred action if not already set. */ -static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - if (!ps->action) { - ps->action = action; - } - push_lh(&ps->proactor->worker_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ - uv_mutex_unlock(&ps->proactor->lock); -} - - -/* Re-queue for further work */ -static void worker_requeue(psocket_t* ps) { - uv_mutex_lock(&ps->proactor->lock); - push_lh(&ps->proactor->worker_q, ps); - uv_async_send(&ps->proactor->async); /* Wake leader */ - uv_mutex_unlock(&ps->proactor->lock); -} - -static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) { - pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc)); - if (!pc) return NULL; - if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) { - return NULL; - } - psocket_init(&pc->psocket, p, true, host, port); - if (server) { - pn_transport_set_server(pc->driver.transport); - } - pn_record_t *r = pn_connection_attachments(pc->driver.connection); - pn_record_def(r, PN_PROACTOR, PN_VOID); - pn_record_set(r, PN_PROACTOR, pc); - return pc; -} - -static pn_event_t *listener_batch_next(pn_event_batch_t *batch); -static pn_event_t *proactor_batch_next(pn_event_batch_t *batch); - -static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) { - return (batch->next_event == proactor_batch_next) ? - (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL; -} - -static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) { - return (batch->next_event == listener_batch_next) ? - (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL; -} - -static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) { - pn_connection_driver_t *d = pn_event_batch_connection_driver(batch); - return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL; -} - -static void leader_count(pn_proactor_t *p, int change) { - uv_mutex_lock(&p->lock); - p->count += change; - p->inactive = (p->count == 0); - uv_mutex_unlock(&p->lock); -} - -/* Free if there are no uv callbacks pending and no events */ -static void leader_pconnection_t_maybe_free(pconnection_t *pc) { - if (pn_connection_driver_has_event(&pc->driver)) { - leader_to_worker(&pc->psocket); /* Return to worker */ - } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) { - /* All UV requests are finished */ - pn_connection_driver_destroy(&pc->driver); - leader_count(pc->psocket.proactor, -1); - free(pc); - } -} - -/* Free if there are no uv callbacks pending and no events */ -static void leader_listener_maybe_free(pn_listener_t *l) { - if (pn_collector_peek(l->collector)) { - leader_to_worker(&l->psocket); /* Return to worker */ - } else if (!l->psocket.tcp.data) { - pn_condition_free(l->condition); - leader_count(l->psocket.proactor, -1); - free(l); - } -} - -/* Free if there are no uv callbacks pending and no events */ -static void leader_maybe_free(psocket_t *ps) { - if (ps->is_conn) { - leader_pconnection_t_maybe_free(as_pconnection_t(ps)); - } else { - leader_listener_maybe_free(as_listener(ps)); - } -} - -static void on_close(uv_handle_t *h) { - psocket_t *ps = (psocket_t*)h->data; - h->data = NULL; /* Mark closed */ - leader_maybe_free(ps); -} - -static void on_shutdown(uv_shutdown_t *shutdown, int err) { - psocket_t *ps = (psocket_t*)shutdown->data; - shutdown->data = NULL; /* Mark closed */ - leader_maybe_free(ps); -} - -static inline void leader_close(psocket_t *ps) { - if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) { - uv_close((uv_handle_t*)&ps->tcp, on_close); - } - pconnection_t *pc = as_pconnection_t(ps); - if (pc) { - pn_connection_driver_close(&pc->driver); - if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { - uv_timer_stop(&pc->timer); - uv_close((uv_handle_t*)&pc->timer, on_close); - } - } - leader_maybe_free(ps); -} - -static pconnection_t *get_pconnection_t(pn_connection_t* c) { - if (!c) return NULL; - pn_record_t *r = pn_connection_attachments(c); - return (pconnection_t*) pn_record_get(r, PN_PROACTOR); -} - -static void leader_error(psocket_t *ps, int err, const char* what) { - if (ps->is_conn) { - pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver; - pn_connection_driver_bind(driver); /* Bind so errors will be reported */ - pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s", - what, fixstr(ps->host), fixstr(ps->port), - uv_strerror(err)); - pn_connection_driver_close(driver); - } else { - pn_listener_t *l = as_listener(ps); - pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s", - what, fixstr(ps->host), fixstr(ps->port), - uv_strerror(err)); - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); - } - leader_to_worker(ps); /* Worker to handle the error */ -} - -/* uv-initialization */ -static int leader_init(psocket_t *ps) { - leader_count(ps->proactor, +1); - int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp); - if (!err) { - pconnection_t *pc = as_pconnection_t(ps); - if (pc) { - pc->connect.data = ps; - int err = uv_timer_init(&ps->proactor->loop, &pc->timer); - if (!err) { - pc->timer.data = pc; - } - } - } - if (err) { - leader_error(ps, err, "initialization"); - } - return err; -} - -/* Common logic for on_connect and on_accept */ -static void leader_connect_accept(pconnection_t *pc, int err, const char *what) { - if (!err) { - leader_to_worker(&pc->psocket); - } else { - leader_error(&pc->psocket, err, what); - } -} - -static void on_connect(uv_connect_t *connect, int err) { - leader_connect_accept((pconnection_t*)connect->data, err, "on connect"); -} - -static void on_accept(uv_stream_t* server, int err) { - pn_listener_t *l = (pn_listener_t*) server->data; - if (err) { - leader_error(&l->psocket, err, "on accept"); - } - pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); - leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */ -} - -static void leader_accept(psocket_t *ps) { - pn_listener_t * l = as_listener(ps); - pconnection_t *pc = l->accepting; - l->accepting = NULL; - if (pc) { - int err = leader_init(&pc->psocket); - if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); - leader_connect_accept(pc, err, "on accept"); - } -} - -static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) { - int err = leader_init(ps); - struct addrinfo hints = { 0 }; - if (server) hints.ai_flags = AI_PASSIVE; - if (!err) { - err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints); - } - return err; -} - -static void leader_connect(psocket_t *ps) { - pconnection_t *pc = as_pconnection_t(ps); - uv_getaddrinfo_t info; - int err = leader_resolve(ps, &info, false); - if (!err) { - err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect); - uv_freeaddrinfo(info.addrinfo); - } - if (err) { - leader_error(ps, err, "connect to"); - } -} - -static void leader_listen(psocket_t *ps) { - pn_listener_t *l = as_listener(ps); - uv_getaddrinfo_t info; - int err = leader_resolve(ps, &info, true); - if (!err) { - err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0); - uv_freeaddrinfo(info.addrinfo); - } - if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); - if (err) { - leader_error(ps, err, "listen on "); - } -} - -static void on_tick(uv_timer_t *timer) { - pconnection_t *pc = (pconnection_t*)timer->data; - pn_transport_t *t = pc->driver.transport; - if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) { - uv_timer_stop(&pc->timer); - uint64_t now = uv_now(pc->timer.loop); - uint64_t next = pn_transport_tick(t, now); - if (next) { - uv_timer_start(&pc->timer, on_tick, next - now, 0); - } - } -} - -static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { - pconnection_t *pc = (pconnection_t*)stream->data; - if (nread >= 0) { - pn_connection_driver_read_done(&pc->driver, nread); - on_tick(&pc->timer); /* check for tick changes. */ - leader_to_worker(&pc->psocket); - /* Reading continues automatically until stopped. */ - } else if (nread == UV_EOF) { /* hangup */ - pn_connection_driver_read_close(&pc->driver); - leader_maybe_free(&pc->psocket); - } else { - leader_error(&pc->psocket, nread, "on read from"); - } -} - -static void on_write(uv_write_t* write, int err) { - pconnection_t *pc = (pconnection_t*)write->data; - write->data = NULL; - if (err == 0) { - pn_connection_driver_write_done(&pc->driver, pc->writing); - leader_to_worker(&pc->psocket); - } else if (err == UV_ECANCELED) { - leader_maybe_free(&pc->psocket); - } else { - leader_error(&pc->psocket, err, "on write to"); - } - pc->writing = 0; /* Need to send a new write request */ -} - -static void on_timeout(uv_timer_t *timer) { - pn_proactor_t *p = (pn_proactor_t*)timer->data; - uv_mutex_lock(&p->lock); - p->timeout_elapsed = true; - uv_mutex_unlock(&p->lock); -} - -// Read buffer allocation function for uv, just returns the transports read buffer. -static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) { - pconnection_t *pc = (pconnection_t*)stream->data; - pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); - *buf = uv_buf_init(rbuf.start, rbuf.size); -} - -static void leader_rewatch(psocket_t *ps) { - int err = 0; - if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); - if (pc->timer.data) { /* uv-initialized */ - on_tick(&pc->timer); /* Re-enable ticks if required */ - } - pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - - /* Ticks and checking buffers can generate events, process before proceeding */ - if (pn_connection_driver_has_event(&pc->driver)) { - leader_to_worker(ps); - } else { /* Re-watch for IO */ - if (wbuf.size > 0 && !pc->writing) { - pc->writing = wbuf.size; - uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); - pc->write.data = ps; - uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); - } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { - pc->shutdown.data = ps; - uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown); - } - if (rbuf.size > 0 && !pc->reading) { - pc->reading = true; - err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); - } - } - } else { - pn_listener_t *l = as_listener(ps); - err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); - } - if (err) { - leader_error(ps, err, "rewatch"); - } -} - -/* Set the event in the proactor's batch */ -static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) { - pn_collector_put(p->collector, pn_proactor__class(), p, t); - p->batch_working = true; - return &p->batch; -} - -/* Return the next event batch or 0 if no events are ready */ -static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) { - if (!p->batch_working) { /* Can generate proactor events */ - if (p->inactive) { - p->inactive = false; - return proactor_batch_lh(p, PN_PROACTOR_INACTIVE); - } - if (p->interrupt > 0) { - --p->interrupt; - return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT); - } - if (p->timeout_elapsed) { - p->timeout_elapsed = false; - return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT); - } - } - for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) { - if (ps->is_conn) { - pconnection_t *pc = as_pconnection_t(ps); - return &pc->driver.batch; - } else { /* Listener */ - pn_listener_t *l = as_listener(ps); - return &l->batch; - } - to_leader(ps); /* No event, back to leader */ - } - return 0; -} - -/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */ -static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) { - uv_mutex_lock(&ps->proactor->lock); - ps->wakeup = action; - to_leader_lh(ps); - uv_mutex_unlock(&ps->proactor->lock); -} - -pn_listener_t *pn_event_listener(pn_event_t *e) { - return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL; -} - -pn_proactor_t *pn_event_proactor(pn_event_t *e) { - if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e); - pn_listener_t *l = pn_event_listener(e); - if (l) return l->psocket.proactor; - pn_connection_t *c = pn_event_connection(e); - if (c) return pn_connection_proactor(pn_event_connection(e)); - return NULL; -} - -void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { - pconnection_t *pc = batch_pconnection(batch); - if (pc) { - if (pn_connection_driver_has_event(&pc->driver)) { - /* Process all events before going back to IO. */ - worker_requeue(&pc->psocket); - } else if (pn_connection_driver_finished(&pc->driver)) { - owner_to_leader(&pc->psocket, leader_close); - } else { - owner_to_leader(&pc->psocket, leader_rewatch); - } - return; - } - pn_listener_t *l = batch_listener(batch); - if (l) { - owner_to_leader(&l->psocket, leader_rewatch); - return; - } - pn_proactor_t *bp = batch_proactor(batch); - if (bp == p) { - uv_mutex_lock(&p->lock); - p->batch_working = false; - uv_async_send(&p->async); /* Wake leader */ - uv_mutex_unlock(&p->lock); - return; - } -} - -/* Run follower/leader loop till we can return an event and be a worker */ -pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { - uv_mutex_lock(&p->lock); - /* Try to grab work immediately. */ - pn_event_batch_t *batch = get_batch_lh(p); - if (batch == NULL) { - /* No work available, follow the leader */ - while (p->has_leader) { - uv_cond_wait(&p->cond, &p->lock); - } - /* Lead till there is work to do. */ - p->has_leader = true; - while (batch == NULL) { - if (p->timeout_request) { - p->timeout_request = false; - if (p->timeout) { - uv_timer_start(&p->timer, on_timeout, p->timeout, 0); - } else { - uv_timer_stop(&p->timer); - } - } - for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { - void (*action)(psocket_t*) = ps->action; - void (*wakeup)(psocket_t*) = ps->wakeup; - ps->action = NULL; - ps->wakeup = NULL; - if (action || wakeup) { - uv_mutex_unlock(&p->lock); - if (action) action(ps); - if (wakeup) wakeup(ps); - uv_mutex_lock(&p->lock); - } - } - batch = get_batch_lh(p); - if (batch == NULL) { - uv_mutex_unlock(&p->lock); - uv_run(&p->loop, UV_RUN_ONCE); - uv_mutex_lock(&p->lock); - } - } - /* Signal the next leader and return to work */ - p->has_leader = false; - uv_cond_signal(&p->cond); - } - uv_mutex_unlock(&p->lock); - return batch; -} - -void pn_proactor_interrupt(pn_proactor_t *p) { - uv_mutex_lock(&p->lock); - ++p->interrupt; - uv_async_send(&p->async); /* Interrupt the UV loop */ - uv_mutex_unlock(&p->lock); -} - -void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { - uv_mutex_lock(&p->lock); - p->timeout = t; - p->timeout_request = true; - uv_async_send(&p->async); /* Interrupt the UV loop */ - uv_mutex_unlock(&p->lock); -} - -int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) { - pconnection_t *pc = new_pconnection_t(p, c, false, host, port); - if (!pc) { - return PN_OUT_OF_MEMORY; - } - /* Process PN_CONNECTION_INIT before binding */ - owner_to_worker(&pc->psocket, leader_connect); - return 0; -} - -int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog) -{ - psocket_init(&l->psocket, p, false, host, port); - l->backlog = backlog; - owner_to_leader(&l->psocket, leader_listen); - return 0; -} - -pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { - pconnection_t *pc = get_pconnection_t(c); - return pc ? pc->psocket.proactor : NULL; -} - -void leader_wake_connection(psocket_t *ps) { - pconnection_t *pc = as_pconnection_t(ps); - pn_connection_t *c = pc->driver.connection; - pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE); - leader_to_worker(ps); -} - -void pn_connection_wake(pn_connection_t* c) { - wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection); -} - -pn_proactor_t *pn_proactor() { - pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); - p->collector = pn_collector(); - p->batch.next_event = &proactor_batch_next; - if (!p->collector) return NULL; - uv_loop_init(&p->loop); - uv_mutex_init(&p->lock); - uv_cond_init(&p->cond); - uv_async_init(&p->loop, &p->async, NULL); - uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */ - p->timer.data = p; - return p; -} - -static void on_stopping(uv_handle_t* h, void* v) { - uv_close(h, NULL); /* Close this handle */ - if (!uv_loop_alive(h->loop)) /* Everything closed */ - uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */ -} - -void pn_proactor_free(pn_proactor_t *p) { - uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */ - uv_run(&p->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */ - uv_loop_close(&p->loop); - uv_mutex_destroy(&p->lock); - uv_cond_destroy(&p->cond); - pn_collector_free(p->collector); - free(p); -} - -static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { - pn_listener_t *l = batch_listener(batch); - pn_event_t *handled = pn_collector_prev(l->collector); - if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) { - owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */ - } - return pn_collector_next(l->collector); -} - -static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { - return pn_collector_next(batch_proactor(batch)->collector); -} - -static void pn_listener_free(pn_listener_t *l) { - if (l) { - if (!l->collector) pn_collector_free(l->collector); - if (!l->condition) pn_condition_free(l->condition); - if (!l->attachments) pn_free(l->attachments); - free(l); - } -} - -pn_listener_t *pn_listener() { - pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t)); - if (l) { - l->batch.next_event = listener_batch_next; - l->collector = pn_collector(); - l->condition = pn_condition(); - l->attachments = pn_record(); - if (!l->condition || !l->collector || !l->attachments) { - pn_listener_free(l); - return NULL; - } - } - return l; -} - -void pn_listener_close(pn_listener_t* l) { - wakeup(&l->psocket, leader_close); -} - -pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { - return l ? l->psocket.proactor : NULL; -} - -pn_condition_t* pn_listener_condition(pn_listener_t* l) { - return l->condition; -} - -void *pn_listener_get_context(pn_listener_t *l) { - return l->context; -} - -void pn_listener_set_context(pn_listener_t *l, void *context) { - l->context = context; -} - -pn_record_t *pn_listener_attachments(pn_listener_t *l) { - return l->attachments; -} - -int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { - if (l->accepting) { - return PN_STATE_ERR; /* Only one at a time */ - } - l->accepting = new_pconnection_t( - l->psocket.proactor, c, true, l->psocket.host, l->psocket.port); - if (!l->accepting) { - return UV_ENOMEM; - } - owner_to_leader(&l->psocket, leader_accept); - return 0; -} - http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/test.py ---------------------------------------------------------------------- diff --git a/examples/c/proactor/test.py b/examples/c/proactor/test.py index a86425d..29aa327 100644 --- a/examples/c/proactor/test.py +++ b/examples/c/proactor/test.py @@ -32,27 +32,27 @@ def receive_expect(n): return ''.join('{"sequence"=%s}\n'%i for i in xrange(1, n+1)) + "%s messages received\n"%n class CExampleTest(BrokerTestCase): - broker_exe = ["libuv_broker"] + broker_exe = ["broker"] def test_send_receive(self): """Send first then receive""" - s = self.proc(["libuv_send", "-a", self.addr]) + s = self.proc(["send", "-a", self.addr]) self.assertEqual("100 messages sent and acknowledged\n", s.wait_out()) - r = self.proc(["libuv_receive", "-a", self.addr]) + r = self.proc(["receive", "-a", self.addr]) self.assertEqual(receive_expect(100), r.wait_out()) def test_receive_send(self): """Start receiving first, then send.""" - r = self.proc(["libuv_receive", "-a", self.addr]); - s = self.proc(["libuv_send", "-a", self.addr]); + r = self.proc(["receive", "-a", self.addr]); + s = self.proc(["send", "-a", self.addr]); self.assertEqual("100 messages sent and acknowledged\n", s.wait_out()) self.assertEqual(receive_expect(100), r.wait_out()) def test_timed_send(self): """Send with timed delay""" - s = self.proc(["libuv_send", "-a", self.addr, "-d100", "-m3"]) + s = self.proc(["send", "-a", self.addr, "-d100", "-m3"]) self.assertEqual("3 messages sent and acknowledged\n", s.wait_out()) - r = self.proc(["libuv_receive", "-a", self.addr, "-m3"]) + r = self.proc(["receive", "-a", self.addr, "-m3"]) self.assertEqual(receive_expect(3), r.wait_out()) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/examples/c/proactor/thread.h ---------------------------------------------------------------------- diff --git a/examples/c/proactor/thread.h b/examples/c/proactor/thread.h new file mode 100644 index 0000000..3b9f19e --- /dev/null +++ b/examples/c/proactor/thread.h @@ -0,0 +1,49 @@ +#ifndef _PROTON_EXAMPLES_C_PROACTOR_THREADS_H +#define _PROTON_EXAMPLES_C_PROACTOR_THREADS_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* EXAMPLE USE ONLY. Simulate the subset of POSIX threads used by examples for windows */ + +#ifdef _WIN32 + +#include <windows.h> +#include <time.h> +#define _WIN32_WINNT 0x500 /* WINBASE.H - Enable SignalObjectAndWait */ +#include <process.h> +#include <windows.h> + +#define pthread_function DWORD WINAPI +#define pthread_function_return DWORD +#define pthread_t HANDLE +#define pthread_create(thhandle,attr,thfunc,tharg) (int)((*thhandle=(HANDLE)_beginthreadex(NULL,0,(DWORD WINAPI(*)())thfunc,tharg,0,NULL))==NULL) +#define pthread_join(thread, result) ((WaitForSingleObject((thread),INFINITE)!=WAIT_OBJECT_0) || !CloseHandle(thread)) +#define pthread_mutex_T HANDLE +#define pthread_mutex_init(pobject,pattr) (*pobject=CreateMutex(NULL,FALSE,NULL)) +#define pthread_mutex_destroy(pobject) CloseHandle(*pobject) +#define pthread_mutex_lock(pobject) WaitForSingleObject(*pobject,INFINITE) +#define pthread_mutex_unlock(pobject) ReleaseMutex(*pobject) + +#else + +#include <pthread.h> + +#endif + +#endif http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index 8edb661..e5552c5 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -105,6 +105,13 @@ else(PN_WINAPI) set (pn_selector_impl src/reactor/io/posix/selector.c) endif(PN_WINAPI) +# Select proactor impl +find_package(Libuv) +if (Libuv_FOUND) + set (qpid-proton-proactor src/proactor/libuv.c) + set (PROACTOR_LIBS ${Libuv_LIBRARIES}) +endif() + # Link in SASL if present if (SASL_IMPL STREQUAL cyrus) set(pn_sasl_impl src/sasl/sasl.c src/sasl/cyrus_sasl.c) @@ -116,7 +123,7 @@ endif () # Set Compiler extra flags for Solaris when using SunStudio if(CMAKE_CXX_COMPILER_ID STREQUAL "SunPro" ) - set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mt" ) + set( CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -mt" ) endif() if(CMAKE_C_COMPILER_ID STREQUAL "SunPro" ) @@ -327,6 +334,7 @@ set (qpid-proton-platform-all src/reactor/io/windows/selector.c src/reactor/io/posix/io.c src/reactor/io/posix/selector.c + src/proactor/libuv.c ) # platform specific library build: @@ -379,7 +387,7 @@ set (qpid-proton-core src/core/autodetect.c src/core/transport.c src/core/message.c - ) +) set (qpid-proton-include-generated ${CMAKE_CURRENT_BINARY_DIR}/src/encodings.h @@ -455,6 +463,7 @@ set (qpid-proton-include include/proton/log.h include/proton/message.h include/proton/object.h + include/proton/proactor.h include/proton/sasl.h include/proton/session.h include/proton/ssl.h @@ -495,9 +504,15 @@ set_source_files_properties ( COMPILE_DEFINITIONS "${PLATFORM_DEFINITIONS}" ) +set_source_files_properties (${qpid-proton-proactor} PROPERTIES + # Skip COMPILE_LANGUAGE_FLAGS, libuv.h won't compile with --std=c99 + COMPILE_FLAGS "${COMPILE_WARNING_FLAGS} ${LTO} " + ) + if (BUILD_WITH_CXX) set_source_files_properties ( ${qpid-proton-core} + ${qpid-proton-proactor} ${qpid-proton-layers} ${qpid-proton-extra} ${qpid-proton-platform} @@ -526,6 +541,12 @@ set_target_properties ( LINK_FLAGS "${CATCH_UNDEFINED} ${LTO}" ) +add_library ( + qpid-proton-proactor SHARED + ${qpid-proton-proactor} + ) +target_link_libraries (qpid-proton-proactor qpid-proton-core ${PROACTOR_LIBS}) + add_library( qpid-proton SHARED # Proton Core @@ -534,7 +555,8 @@ add_library( ${qpid-proton-platform} ${qpid-proton-include} ${qpid-proton-include-generated} - + # Proactor + ${qpid-proton-proactor} # Proton Reactor/Messenger ${qpid-proton-extra} ${qpid-proton-platform-io} @@ -550,7 +572,7 @@ if (MSVC) add_dependencies(qpid-proton qpid-proton-core) endif (MSVC) -target_link_libraries (qpid-proton ${UUID_LIB} ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS}) +target_link_libraries (qpid-proton ${UUID_LIB} ${SSL_LIB} ${SASL_LIB} ${TIME_LIB} ${PLATFORM_LIBS} ${PROACTOR_LIBS}) set_target_properties ( qpid-proton @@ -586,32 +608,26 @@ install (FILES ${headers} DESTINATION ${INCLUDE_INSTALL_DIR}/proton) install (FILES ${CMAKE_CURRENT_BINARY_DIR}/include/proton/version.h DESTINATION ${INCLUDE_INSTALL_DIR}/proton) -# Pkg config file -configure_file( - ${CMAKE_CURRENT_SOURCE_DIR}/src/libqpid-proton.pc.in - ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton.pc @ONLY) -install (FILES - ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton.pc - DESTINATION ${LIB_INSTALL_DIR}/pkgconfig) -configure_file( - ${CMAKE_CURRENT_SOURCE_DIR}/src/libqpid-proton-core.pc.in - ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton-core.pc @ONLY) -install (FILES - ${CMAKE_CURRENT_BINARY_DIR}/libqpid-proton-core.pc - DESTINATION ${LIB_INSTALL_DIR}/pkgconfig) - +# Set ${VAR}/${VAR}DEBUG variables, configure and install the packageconf files for LIB +macro(configure_lib VAR LIB) + if(DEFINED CMAKE_IMPORT_LIBRARY_PREFIX) + set(LIB_PREFIX ${CMAKE_IMPORT_LIBRARY_PREFIX}) + set(LIB_SUFFIX ${CMAKE_IMPORT_LIBRARY_SUFFIX}) + else() + set(LIB_PREFIX ${CMAKE_SHARED_LIBRARY_PREFIX}) + set(LIB_SUFFIX ${CMAKE_SHARED_LIBRARY_SUFFIX}) + endif() + set(${VAR} ${LIB_PREFIX}${LIB}${LIB_SUFFIX}) + set("${VAR}DEBUG" ${LIB_PREFIX}${LIB}${CMAKE_DEBUG_POSTFIX}${LIB_SUFFIX}) + configure_file( + ${CMAKE_CURRENT_SOURCE_DIR}/src/lib${LIB}.pc.in + ${CMAKE_CURRENT_BINARY_DIR}/lib${LIB}.pc @ONLY) + install (FILES ${CMAKE_CURRENT_BINARY_DIR}/lib${LIB}.pc DESTINATION ${LIB_INSTALL_DIR}/pkgconfig) +endmacro() -if (DEFINED CMAKE_IMPORT_LIBRARY_PREFIX) -set(PROTONLIB ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton${CMAKE_IMPORT_LIBRARY_SUFFIX}) -set(PROTONLIBDEBUG ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton${CMAKE_DEBUG_POSTFIX}${CMAKE_IMPORT_LIBRARY_SUFFIX}) -set(PROTONCORELIB ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton-core${CMAKE_IMPORT_LIBRARY_SUFFIX}) -set(PROTONCORELIBDEBUG ${CMAKE_IMPORT_LIBRARY_PREFIX}qpid-proton-core${CMAKE_DEBUG_POSTFIX}${CMAKE_IMPORT_LIBRARY_SUFFIX}) -else () -set(PROTONLIB ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton${CMAKE_SHARED_LIBRARY_SUFFIX}) -set(PROTONLIBDEBUG ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton${CMAKE_DEBUG_POSTFIX}${CMAKE_SHARED_LIBRARY_SUFFIX}) -set(PROTONCORELIB ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton-core${CMAKE_SHARED_LIBRARY_SUFFIX}) -set(PROTONCORELIBDEBUG ${CMAKE_SHARED_LIBRARY_PREFIX}qpid-proton-core${CMAKE_DEBUG_POSTFIX}${CMAKE_SHARED_LIBRARY_SUFFIX}) -endif () +configure_lib(PROTONLIB qpid-proton) +configure_lib(PROTONCORELIB qpid-proton-core) +configure_lib(PROTONPROACTORLIB qpid-proton-proactor) include(WriteBasicConfigVersionFile) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/import_export.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/import_export.h b/proton-c/include/proton/import_export.h index 6547a07..86776cd 100644 --- a/proton-c/include/proton/import_export.h +++ b/proton-c/include/proton/import_export.h @@ -56,6 +56,13 @@ # define PN_EXTERN PN_IMPORT #endif +// For proactor proton symbols +#if defined(qpid_proton_proactor_EXPORTS) || defined(qpid_proton_EXPORTS) +# define PNP_EXTERN PN_EXPORT +#else +# define PNP_EXTERN PN_IMPORT +#endif + // For extra proton symbols #if defined(qpid_proton_EXPORTS) # define PNX_EXTERN PN_EXPORT http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/listener.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h index 729c095..4656ee4 100644 --- a/proton-c/include/proton/listener.h +++ b/proton-c/include/proton/listener.h @@ -63,7 +63,7 @@ PN_EXTERN pn_condition_t *pn_listener_condition(pn_listener_t *l); /** * @cond INTERNAL */ - + /** * @deprecated * @@ -81,7 +81,7 @@ PN_EXTERN void pn_listener_set_context(pn_listener_t *listener, void *context); /** * @endcond */ - + /** * Get the attachments that are associated with a listener object. */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h index 695bbb1..71a7dda 100644 --- a/proton-c/include/proton/proactor.h +++ b/proton-c/include/proton/proactor.h @@ -53,13 +53,13 @@ extern "C" { /** * Create a proactor. Must be freed with pn_proactor_free() */ -pn_proactor_t *pn_proactor(void); +PNP_EXTERN pn_proactor_t *pn_proactor(void); /** * Free the proactor. Abort any open network connections and clean up all * associated resources. */ -void pn_proactor_free(pn_proactor_t *proactor); +PNP_EXTERN void pn_proactor_free(pn_proactor_t *proactor); /** * Connect connection to host/port. Connection and transport events will be @@ -72,9 +72,9 @@ void pn_proactor_free(pn_proactor_t *proactor); * * @return error on immediate error, e.g. an allocation failure. * Other errors are indicated by connection or transport events via - * pn_proactor_wait() +PNP_EXTERN * pn_proactor_wait() */ -int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection, +PNP_EXTERN int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection, const char *host, const char *port); /** @@ -91,7 +91,7 @@ int pn_proactor_connect(pn_proactor_t *proactor, pn_connection_t *connection, * Other errors are indicated by pn_listener_condition() on the * PN_LISTENER_CLOSE event. */ -int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener, +PNP_EXTERN int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener, const char *host, const char *port, int backlog); /** @@ -111,7 +111,7 @@ int pn_proactor_listen(pn_proactor_t *proactor, pn_listener_t *listener, * batch must be handled in sequence, but batches returned by separate * calls to pn_proactor_wait() can be handled concurrently. */ -pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor); +PNP_EXTERN pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor); /** * Call when done handling a batch of events. @@ -122,7 +122,7 @@ pn_event_batch_t *pn_proactor_wait(pn_proactor_t *proactor); * @note Thread-safe: may be called from any thread provided the * exactly once rule is respected. */ -void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events); +PNP_EXTERN void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events); /** * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one call of @@ -136,7 +136,7 @@ void pn_proactor_done(pn_proactor_t *proactor, pn_event_batch_t *events); * * @note Thread-safe. */ -void pn_proactor_interrupt(pn_proactor_t *proactor); +PNP_EXTERN void pn_proactor_interrupt(pn_proactor_t *proactor); /** * Cause PN_PROACTOR_TIMEOUT to be returned to a thread calling wait() @@ -148,7 +148,7 @@ void pn_proactor_interrupt(pn_proactor_t *proactor); * timeout. `pn_proactor_set_timeout(0)` will cancel the timeout * without setting a new one. */ -void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout); +PNP_EXTERN void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout); /** * Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if @@ -160,22 +160,22 @@ void pn_proactor_set_timeout(pn_proactor_t *proactor, pn_millis_t timeout); * Wakes can be "coalesced" - if several pn_connection_wake() calls happen * concurrently, there may be only one PN_CONNECTION_WAKE event. */ -void pn_connection_wake(pn_connection_t *connection); +PNP_EXTERN void pn_connection_wake(pn_connection_t *connection); /** * Return the proactor associated with a connection or NULL. */ -pn_proactor_t *pn_connection_proactor(pn_connection_t *connection); +PNP_EXTERN pn_proactor_t *pn_connection_proactor(pn_connection_t *connection); /** * Return the proactor associated with an event or NULL. */ -pn_proactor_t *pn_event_proactor(pn_event_t *event); +PNP_EXTERN pn_proactor_t *pn_event_proactor(pn_event_t *event); /** * Return the listener associated with an event or NULL. */ -pn_listener_t *pn_event_listener(pn_event_t *event); +PNP_EXTERN pn_listener_t *pn_event_listener(pn_event_t *event); /** * @} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/include/proton/types.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h index 4400393..1abe9e6 100644 --- a/proton-c/include/proton/types.h +++ b/proton-c/include/proton/types.h @@ -407,6 +407,12 @@ typedef struct pn_delivery_t pn_delivery_t; typedef struct pn_collector_t pn_collector_t; /** + * A listener accepts connections. + * @ingroup listener + */ +typedef struct pn_listener_t pn_listener_t; + +/** * An AMQP Transport object. * * A pn_transport_t encapsulates the transport related state of all @@ -419,6 +425,11 @@ typedef struct pn_collector_t pn_collector_t; typedef struct pn_transport_t pn_transport_t; /** + * The proactor, see pn_proactor() + */ +typedef struct pn_proactor_t pn_proactor_t; + +/** * @cond INTERNAL * * An event handler @@ -426,12 +437,6 @@ typedef struct pn_transport_t pn_transport_t; * A pn_handler_t is target of ::pn_event_t dispatched by the pn_reactor_t */ typedef struct pn_handler_t pn_handler_t; - -/** - * - */ -typedef struct pn_proactor_t pn_proactor_t; -typedef struct pn_listener_t pn_listener_t; /** * @endcond */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/src/libqpid-proton-proactor.pc.in ---------------------------------------------------------------------- diff --git a/proton-c/src/libqpid-proton-proactor.pc.in b/proton-c/src/libqpid-proton-proactor.pc.in new file mode 100644 index 0000000..19007a8 --- /dev/null +++ b/proton-c/src/libqpid-proton-proactor.pc.in @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +prefix=@PREFIX@ +exec_prefix=@EXEC_PREFIX@ +libdir=@LIBDIR@ +includedir=@INCLUDEDIR@ + +Name: Proton Proactor +Description: Qpid Proton C proative IO library +Version: @PN_VERSION@ +URL: http://qpid.apache.org/proton/ +Libs: -L${libdir} -lqpid-proton-proactor +Cflags: -I${includedir} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/afacb165/proton-c/src/proactor/libuv.c ---------------------------------------------------------------------- diff --git a/proton-c/src/proactor/libuv.c b/proton-c/src/proactor/libuv.c new file mode 100644 index 0000000..42bbfab --- /dev/null +++ b/proton-c/src/proactor/libuv.c @@ -0,0 +1,873 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <uv.h> + +#include <proton/condition.h> +#include <proton/connection_driver.h> +#include <proton/engine.h> +#include <proton/message.h> +#include <proton/object.h> +#include <proton/proactor.h> +#include <proton/transport.h> +#include <proton/url.h> + +#include <assert.h> +#include <stddef.h> +#include <stdio.h> +#include <stdlib.h> +#include <string.h> + +/* + libuv loop functions are thread unsafe. The only exception is uv_async_send() + which is a thread safe "wakeup" that can wake the uv_loop from another thread. + + To provide concurrency the proactor uses a "leader-worker-follower" model, + threads take turns at the roles: + + - a single "leader" calls libuv functions and runs the uv_loop in short bursts + to generate work. When there is work available it gives up leadership and + becomes a "worker" + + - "workers" handle events concurrently for distinct connections/listeners + They do as much work as they can get, when none is left they become "followers" + + - "followers" wait for the leader to generate work and become workers. + When the leader itself becomes a worker, one of the followers takes over. + + This model is symmetric: any thread can take on any role based on run-time + requirements. It also allows the IO and non-IO work associated with an IO + wake-up to be processed in a single thread with no context switches. + + Function naming: + - on_ - called in leader thread via uv_run(). + - leader_ - called in leader thread, while processing the leader_q. + - owner_ - called in owning thread, leader or worker but not concurrently. + + Note on_ and leader_ functions can call each other, the prefix indicates the + path they are most often called on. +*/ + +const char *COND_NAME = "proactor"; +const char *AMQP_PORT = "5672"; +const char *AMQP_PORT_NAME = "amqp"; +const char *AMQPS_PORT = "5671"; +const char *AMQPS_PORT_NAME = "amqps"; + +PN_HANDLE(PN_PROACTOR) + +/* pn_proactor_t and pn_listener_t are plain C structs with normal memory management. + Class definitions are for identification as pn_event_t context only. +*/ +PN_STRUCT_CLASSDEF(pn_proactor, CID_pn_proactor) +PN_STRUCT_CLASSDEF(pn_listener, CID_pn_listener) + +/* common to connection and listener */ +typedef struct psocket_t { + /* Immutable */ + pn_proactor_t *proactor; + + /* Protected by proactor.lock */ + struct psocket_t* next; + void (*wakeup)(struct psocket_t*); /* interrupting action for leader */ + + /* Only used by leader */ + uv_tcp_t tcp; + void (*action)(struct psocket_t*); /* deferred action for leader */ + bool is_conn:1; + char host[NI_MAXHOST]; + char port[NI_MAXSERV]; +} psocket_t; + +/* Special value for psocket.next pointer when socket is not on any any list. */ +psocket_t UNLISTED; + +static void psocket_init(psocket_t* ps, pn_proactor_t* p, bool is_conn, const char *host, const char *port) { + ps->proactor = p; + ps->next = &UNLISTED; + ps->is_conn = is_conn; + ps->tcp.data = ps; + + /* For platforms that don't know about "amqp" and "amqps" service names. */ + if (strcmp(port, AMQP_PORT_NAME) == 0) + port = AMQP_PORT; + else if (strcmp(port, AMQPS_PORT_NAME) == 0) + port = AMQPS_PORT; + /* Set to "\001" to indicate a NULL as opposed to an empty string "" */ + strncpy(ps->host, host ? host : "\001", sizeof(ps->host)); + strncpy(ps->port, port ? port : "\001", sizeof(ps->port)); +} + +/* Turn "\001" back to NULL */ +static inline const char* fixstr(const char* str) { + return str[0] == '\001' ? NULL : str; +} + +typedef struct pconnection_t { + psocket_t psocket; + + /* Only used by owner thread */ + pn_connection_driver_t driver; + + /* Only used by leader */ + uv_connect_t connect; + uv_timer_t timer; + uv_write_t write; + uv_shutdown_t shutdown; + size_t writing; + bool reading:1; + bool server:1; /* accept, not connect */ +} pconnection_t; + +struct pn_listener_t { + psocket_t psocket; + + /* Only used by owner thread */ + pconnection_t *accepting; /* accept in progress */ + pn_condition_t *condition; + pn_collector_t *collector; + pn_event_batch_t batch; + pn_record_t *attachments; + void *context; + size_t backlog; +}; + + +typedef struct queue { psocket_t *front, *back; } queue; + +struct pn_proactor_t { + /* Leader thread */ + uv_cond_t cond; + uv_loop_t loop; + uv_async_t async; + uv_timer_t timer; + + /* Owner thread: proactor collector and batch can belong to leader or a worker */ + pn_collector_t *collector; + pn_event_batch_t batch; + + /* Protected by lock */ + uv_mutex_t lock; + queue start_q; + queue worker_q; + queue leader_q; + size_t interrupt; /* pending interrupts */ + pn_millis_t timeout; + size_t count; /* psocket count */ + bool inactive:1; + bool timeout_request:1; + bool timeout_elapsed:1; + bool has_leader:1; + bool batch_working:1; /* batch belongs to a worker. */ +}; + +static bool push_lh(queue *q, psocket_t *ps) { + if (ps->next != &UNLISTED) /* Don't move if already listed. */ + return false; + ps->next = NULL; + if (!q->front) { + q->front = q->back = ps; + } else { + q->back->next = ps; + q->back = ps; + } + return true; +} + +static psocket_t* pop_lh(queue *q) { + psocket_t *ps = q->front; + if (ps) { + q->front = ps->next; + ps->next = &UNLISTED; + } + return ps; +} + +static inline pconnection_t *as_pconnection_t(psocket_t* ps) { + return ps->is_conn ? (pconnection_t*)ps : NULL; +} + +static inline pn_listener_t *as_listener(psocket_t* ps) { + return ps->is_conn ? NULL: (pn_listener_t*)ps; +} + +/* Put ps on the leader queue for processing. Thread safe. */ +static void to_leader_lh(psocket_t *ps) { + push_lh(&ps->proactor->leader_q, ps); + uv_async_send(&ps->proactor->async); /* Wake leader */ +} + +static void to_leader(psocket_t *ps) { + uv_mutex_lock(&ps->proactor->lock); + to_leader_lh(ps); + uv_mutex_unlock(&ps->proactor->lock); +} + +/* Detach from IO and put ps on the worker queue */ +static void leader_to_worker(psocket_t *ps) { + if (ps->is_conn) { + pconnection_t *pc = as_pconnection_t(ps); + /* Don't detach if there are no events yet. */ + if (pn_connection_driver_has_event(&pc->driver)) { + if (pc->writing) { + pc->writing = 0; + uv_cancel((uv_req_t*)&pc->write); + } + if (pc->reading) { + pc->reading = false; + uv_read_stop((uv_stream_t*)&pc->psocket.tcp); + } + if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { + uv_timer_stop(&pc->timer); + } + } + } else { + pn_listener_t *l = as_listener(ps); + uv_read_stop((uv_stream_t*)&l->psocket.tcp); + } + uv_mutex_lock(&ps->proactor->lock); + push_lh(&ps->proactor->worker_q, ps); + uv_mutex_unlock(&ps->proactor->lock); +} + +/* Set a deferred action for leader, if not already set. */ +static void owner_to_leader(psocket_t *ps, void (*action)(psocket_t*)) { + uv_mutex_lock(&ps->proactor->lock); + if (!ps->action) { + ps->action = action; + } + to_leader_lh(ps); + uv_mutex_unlock(&ps->proactor->lock); +} + +/* Owner thread send to worker thread. Set deferred action if not already set. */ +static void owner_to_worker(psocket_t *ps, void (*action)(psocket_t*)) { + uv_mutex_lock(&ps->proactor->lock); + if (!ps->action) { + ps->action = action; + } + push_lh(&ps->proactor->worker_q, ps); + uv_async_send(&ps->proactor->async); /* Wake leader */ + uv_mutex_unlock(&ps->proactor->lock); +} + + +/* Re-queue for further work */ +static void worker_requeue(psocket_t* ps) { + uv_mutex_lock(&ps->proactor->lock); + push_lh(&ps->proactor->worker_q, ps); + uv_async_send(&ps->proactor->async); /* Wake leader */ + uv_mutex_unlock(&ps->proactor->lock); +} + +static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) { + pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc)); + if (!pc) return NULL; + if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) { + return NULL; + } + psocket_init(&pc->psocket, p, true, host, port); + if (server) { + pn_transport_set_server(pc->driver.transport); + } + pn_record_t *r = pn_connection_attachments(pc->driver.connection); + pn_record_def(r, PN_PROACTOR, PN_VOID); + pn_record_set(r, PN_PROACTOR, pc); + return pc; +} + +static pn_event_t *listener_batch_next(pn_event_batch_t *batch); +static pn_event_t *proactor_batch_next(pn_event_batch_t *batch); + +static inline pn_proactor_t *batch_proactor(pn_event_batch_t *batch) { + return (batch->next_event == proactor_batch_next) ? + (pn_proactor_t*)((char*)batch - offsetof(pn_proactor_t, batch)) : NULL; +} + +static inline pn_listener_t *batch_listener(pn_event_batch_t *batch) { + return (batch->next_event == listener_batch_next) ? + (pn_listener_t*)((char*)batch - offsetof(pn_listener_t, batch)) : NULL; +} + +static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) { + pn_connection_driver_t *d = pn_event_batch_connection_driver(batch); + return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL; +} + +static void leader_count(pn_proactor_t *p, int change) { + uv_mutex_lock(&p->lock); + p->count += change; + p->inactive = (p->count == 0); + uv_mutex_unlock(&p->lock); +} + +/* Free if there are no uv callbacks pending and no events */ +static void leader_pconnection_t_maybe_free(pconnection_t *pc) { + if (pn_connection_driver_has_event(&pc->driver)) { + leader_to_worker(&pc->psocket); /* Return to worker */ + } else if (!(pc->psocket.tcp.data || pc->write.data || pc->shutdown.data || pc->timer.data)) { + /* All UV requests are finished */ + pn_connection_driver_destroy(&pc->driver); + leader_count(pc->psocket.proactor, -1); + free(pc); + } +} + +/* Free if there are no uv callbacks pending and no events */ +static void leader_listener_maybe_free(pn_listener_t *l) { + if (pn_collector_peek(l->collector)) { + leader_to_worker(&l->psocket); /* Return to worker */ + } else if (!l->psocket.tcp.data) { + pn_condition_free(l->condition); + leader_count(l->psocket.proactor, -1); + free(l); + } +} + +/* Free if there are no uv callbacks pending and no events */ +static void leader_maybe_free(psocket_t *ps) { + if (ps->is_conn) { + leader_pconnection_t_maybe_free(as_pconnection_t(ps)); + } else { + leader_listener_maybe_free(as_listener(ps)); + } +} + +static void on_close(uv_handle_t *h) { + psocket_t *ps = (psocket_t*)h->data; + h->data = NULL; /* Mark closed */ + leader_maybe_free(ps); +} + +static void on_shutdown(uv_shutdown_t *shutdown, int err) { + psocket_t *ps = (psocket_t*)shutdown->data; + shutdown->data = NULL; /* Mark closed */ + leader_maybe_free(ps); +} + +static inline void leader_close(psocket_t *ps) { + if (ps->tcp.data && !uv_is_closing((uv_handle_t*)&ps->tcp)) { + uv_close((uv_handle_t*)&ps->tcp, on_close); + } + pconnection_t *pc = as_pconnection_t(ps); + if (pc) { + pn_connection_driver_close(&pc->driver); + if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { + uv_timer_stop(&pc->timer); + uv_close((uv_handle_t*)&pc->timer, on_close); + } + } + leader_maybe_free(ps); +} + +static pconnection_t *get_pconnection_t(pn_connection_t* c) { + if (!c) return NULL; + pn_record_t *r = pn_connection_attachments(c); + return (pconnection_t*) pn_record_get(r, PN_PROACTOR); +} + +static void leader_error(psocket_t *ps, int err, const char* what) { + if (ps->is_conn) { + pn_connection_driver_t *driver = &as_pconnection_t(ps)->driver; + pn_connection_driver_bind(driver); /* Bind so errors will be reported */ + pn_connection_driver_errorf(driver, COND_NAME, "%s %s:%s: %s", + what, fixstr(ps->host), fixstr(ps->port), + uv_strerror(err)); + pn_connection_driver_close(driver); + } else { + pn_listener_t *l = as_listener(ps); + pn_condition_format(l->condition, COND_NAME, "%s %s:%s: %s", + what, fixstr(ps->host), fixstr(ps->port), + uv_strerror(err)); + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_CLOSE); + } + leader_to_worker(ps); /* Worker to handle the error */ +} + +/* uv-initialization */ +static int leader_init(psocket_t *ps) { + leader_count(ps->proactor, +1); + int err = uv_tcp_init(&ps->proactor->loop, &ps->tcp); + if (!err) { + pconnection_t *pc = as_pconnection_t(ps); + if (pc) { + pc->connect.data = ps; + int err = uv_timer_init(&ps->proactor->loop, &pc->timer); + if (!err) { + pc->timer.data = pc; + } + } + } + if (err) { + leader_error(ps, err, "initialization"); + } + return err; +} + +/* Common logic for on_connect and on_accept */ +static void leader_connect_accept(pconnection_t *pc, int err, const char *what) { + if (!err) { + leader_to_worker(&pc->psocket); + } else { + leader_error(&pc->psocket, err, what); + } +} + +static void on_connect(uv_connect_t *connect, int err) { + leader_connect_accept((pconnection_t*)connect->data, err, "on connect"); +} + +static void on_accept(uv_stream_t* server, int err) { + pn_listener_t *l = (pn_listener_t*) server->data; + if (err) { + leader_error(&l->psocket, err, "on accept"); + } + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); + leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */ +} + +static void leader_accept(psocket_t *ps) { + pn_listener_t * l = as_listener(ps); + pconnection_t *pc = l->accepting; + l->accepting = NULL; + if (pc) { + int err = leader_init(&pc->psocket); + if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); + leader_connect_accept(pc, err, "on accept"); + } +} + +static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) { + int err = leader_init(ps); + struct addrinfo hints = { 0 }; + if (server) hints.ai_flags = AI_PASSIVE; + if (!err) { + err = uv_getaddrinfo(&ps->proactor->loop, info, NULL, fixstr(ps->host), fixstr(ps->port), &hints); + } + return err; +} + +static void leader_connect(psocket_t *ps) { + pconnection_t *pc = as_pconnection_t(ps); + uv_getaddrinfo_t info; + int err = leader_resolve(ps, &info, false); + if (!err) { + err = uv_tcp_connect(&pc->connect, &pc->psocket.tcp, info.addrinfo->ai_addr, on_connect); + uv_freeaddrinfo(info.addrinfo); + } + if (err) { + leader_error(ps, err, "connect to"); + } +} + +static void leader_listen(psocket_t *ps) { + pn_listener_t *l = as_listener(ps); + uv_getaddrinfo_t info; + int err = leader_resolve(ps, &info, true); + if (!err) { + err = uv_tcp_bind(&l->psocket.tcp, info.addrinfo->ai_addr, 0); + uv_freeaddrinfo(info.addrinfo); + } + if (!err) err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); + if (err) { + leader_error(ps, err, "listen on "); + } +} + +static void on_tick(uv_timer_t *timer) { + pconnection_t *pc = (pconnection_t*)timer->data; + pn_transport_t *t = pc->driver.transport; + if (pn_transport_get_idle_timeout(t) || pn_transport_get_remote_idle_timeout(t)) { + uv_timer_stop(&pc->timer); + uint64_t now = uv_now(pc->timer.loop); + uint64_t next = pn_transport_tick(t, now); + if (next) { + uv_timer_start(&pc->timer, on_tick, next - now, 0); + } + } +} + +static void on_read(uv_stream_t* stream, ssize_t nread, const uv_buf_t* buf) { + pconnection_t *pc = (pconnection_t*)stream->data; + if (nread >= 0) { + pn_connection_driver_read_done(&pc->driver, nread); + on_tick(&pc->timer); /* check for tick changes. */ + leader_to_worker(&pc->psocket); + /* Reading continues automatically until stopped. */ + } else if (nread == UV_EOF) { /* hangup */ + pn_connection_driver_read_close(&pc->driver); + leader_maybe_free(&pc->psocket); + } else { + leader_error(&pc->psocket, nread, "on read from"); + } +} + +static void on_write(uv_write_t* write, int err) { + pconnection_t *pc = (pconnection_t*)write->data; + write->data = NULL; + if (err == 0) { + pn_connection_driver_write_done(&pc->driver, pc->writing); + leader_to_worker(&pc->psocket); + } else if (err == UV_ECANCELED) { + leader_maybe_free(&pc->psocket); + } else { + leader_error(&pc->psocket, err, "on write to"); + } + pc->writing = 0; /* Need to send a new write request */ +} + +static void on_timeout(uv_timer_t *timer) { + pn_proactor_t *p = (pn_proactor_t*)timer->data; + uv_mutex_lock(&p->lock); + p->timeout_elapsed = true; + uv_mutex_unlock(&p->lock); +} + +// Read buffer allocation function for uv, just returns the transports read buffer. +static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) { + pconnection_t *pc = (pconnection_t*)stream->data; + pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); + *buf = uv_buf_init(rbuf.start, rbuf.size); +} + +static void leader_rewatch(psocket_t *ps) { + int err = 0; + if (ps->is_conn) { + pconnection_t *pc = as_pconnection_t(ps); + if (pc->timer.data) { /* uv-initialized */ + on_tick(&pc->timer); /* Re-enable ticks if required */ + } + pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); + pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); + + /* Ticks and checking buffers can generate events, process before proceeding */ + if (pn_connection_driver_has_event(&pc->driver)) { + leader_to_worker(ps); + } else { /* Re-watch for IO */ + if (wbuf.size > 0 && !pc->writing) { + pc->writing = wbuf.size; + uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); + pc->write.data = ps; + uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); + } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { + pc->shutdown.data = ps; + uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown); + } + if (rbuf.size > 0 && !pc->reading) { + pc->reading = true; + err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); + } + } + } else { + pn_listener_t *l = as_listener(ps); + err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); + } + if (err) { + leader_error(ps, err, "rewatch"); + } +} + +/* Set the event in the proactor's batch */ +static pn_event_batch_t *proactor_batch_lh(pn_proactor_t *p, pn_event_type_t t) { + pn_collector_put(p->collector, pn_proactor__class(), p, t); + p->batch_working = true; + return &p->batch; +} + +/* Return the next event batch or 0 if no events are ready */ +static pn_event_batch_t* get_batch_lh(pn_proactor_t *p) { + if (!p->batch_working) { /* Can generate proactor events */ + if (p->inactive) { + p->inactive = false; + return proactor_batch_lh(p, PN_PROACTOR_INACTIVE); + } + if (p->interrupt > 0) { + --p->interrupt; + return proactor_batch_lh(p, PN_PROACTOR_INTERRUPT); + } + if (p->timeout_elapsed) { + p->timeout_elapsed = false; + return proactor_batch_lh(p, PN_PROACTOR_TIMEOUT); + } + } + for (psocket_t *ps = pop_lh(&p->worker_q); ps; ps = pop_lh(&p->worker_q)) { + if (ps->is_conn) { + pconnection_t *pc = as_pconnection_t(ps); + return &pc->driver.batch; + } else { /* Listener */ + pn_listener_t *l = as_listener(ps); + return &l->batch; + } + to_leader(ps); /* No event, back to leader */ + } + return 0; +} + +/* Called in any thread to set a wakeup action. Replaces any previous wakeup action. */ +static void wakeup(psocket_t *ps, void (*action)(psocket_t*)) { + uv_mutex_lock(&ps->proactor->lock); + ps->wakeup = action; + to_leader_lh(ps); + uv_mutex_unlock(&ps->proactor->lock); +} + +pn_listener_t *pn_event_listener(pn_event_t *e) { + return (pn_event_class(e) == pn_listener__class()) ? (pn_listener_t*)pn_event_context(e) : NULL; +} + +pn_proactor_t *pn_event_proactor(pn_event_t *e) { + if (pn_event_class(e) == pn_proactor__class()) return (pn_proactor_t*)pn_event_context(e); + pn_listener_t *l = pn_event_listener(e); + if (l) return l->psocket.proactor; + pn_connection_t *c = pn_event_connection(e); + if (c) return pn_connection_proactor(pn_event_connection(e)); + return NULL; +} + +void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { + pconnection_t *pc = batch_pconnection(batch); + if (pc) { + if (pn_connection_driver_has_event(&pc->driver)) { + /* Process all events before going back to IO. */ + worker_requeue(&pc->psocket); + } else if (pn_connection_driver_finished(&pc->driver)) { + owner_to_leader(&pc->psocket, leader_close); + } else { + owner_to_leader(&pc->psocket, leader_rewatch); + } + return; + } + pn_listener_t *l = batch_listener(batch); + if (l) { + owner_to_leader(&l->psocket, leader_rewatch); + return; + } + pn_proactor_t *bp = batch_proactor(batch); + if (bp == p) { + uv_mutex_lock(&p->lock); + p->batch_working = false; + uv_async_send(&p->async); /* Wake leader */ + uv_mutex_unlock(&p->lock); + return; + } +} + +/* Run follower/leader loop till we can return an event and be a worker */ +pn_event_batch_t *pn_proactor_wait(struct pn_proactor_t* p) { + uv_mutex_lock(&p->lock); + /* Try to grab work immediately. */ + pn_event_batch_t *batch = get_batch_lh(p); + if (batch == NULL) { + /* No work available, follow the leader */ + while (p->has_leader) { + uv_cond_wait(&p->cond, &p->lock); + } + /* Lead till there is work to do. */ + p->has_leader = true; + while (batch == NULL) { + if (p->timeout_request) { + p->timeout_request = false; + if (p->timeout) { + uv_timer_start(&p->timer, on_timeout, p->timeout, 0); + } else { + uv_timer_stop(&p->timer); + } + } + for (psocket_t *ps = pop_lh(&p->leader_q); ps; ps = pop_lh(&p->leader_q)) { + void (*action)(psocket_t*) = ps->action; + void (*wakeup)(psocket_t*) = ps->wakeup; + ps->action = NULL; + ps->wakeup = NULL; + if (action || wakeup) { + uv_mutex_unlock(&p->lock); + if (action) action(ps); + if (wakeup) wakeup(ps); + uv_mutex_lock(&p->lock); + } + } + batch = get_batch_lh(p); + if (batch == NULL) { + uv_mutex_unlock(&p->lock); + uv_run(&p->loop, UV_RUN_ONCE); + uv_mutex_lock(&p->lock); + } + } + /* Signal the next leader and return to work */ + p->has_leader = false; + uv_cond_signal(&p->cond); + } + uv_mutex_unlock(&p->lock); + return batch; +} + +void pn_proactor_interrupt(pn_proactor_t *p) { + uv_mutex_lock(&p->lock); + ++p->interrupt; + uv_async_send(&p->async); /* Interrupt the UV loop */ + uv_mutex_unlock(&p->lock); +} + +void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { + uv_mutex_lock(&p->lock); + p->timeout = t; + p->timeout_request = true; + uv_async_send(&p->async); /* Interrupt the UV loop */ + uv_mutex_unlock(&p->lock); +} + +int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) { + pconnection_t *pc = new_pconnection_t(p, c, false, host, port); + if (!pc) { + return PN_OUT_OF_MEMORY; + } + /* Process PN_CONNECTION_INIT before binding */ + owner_to_worker(&pc->psocket, leader_connect); + return 0; +} + +int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog) +{ + psocket_init(&l->psocket, p, false, host, port); + l->backlog = backlog; + owner_to_leader(&l->psocket, leader_listen); + return 0; +} + +pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { + pconnection_t *pc = get_pconnection_t(c); + return pc ? pc->psocket.proactor : NULL; +} + +void leader_wake_connection(psocket_t *ps) { + pconnection_t *pc = as_pconnection_t(ps); + pn_connection_t *c = pc->driver.connection; + pn_collector_put(pn_connection_collector(c), PN_OBJECT, c, PN_CONNECTION_WAKE); + leader_to_worker(ps); +} + +void pn_connection_wake(pn_connection_t* c) { + wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection); +} + +pn_proactor_t *pn_proactor() { + pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); + p->collector = pn_collector(); + p->batch.next_event = &proactor_batch_next; + if (!p->collector) return NULL; + uv_loop_init(&p->loop); + uv_mutex_init(&p->lock); + uv_cond_init(&p->cond); + uv_async_init(&p->loop, &p->async, NULL); + uv_timer_init(&p->loop, &p->timer); /* Just wake the loop */ + p->timer.data = p; + return p; +} + +static void on_stopping(uv_handle_t* h, void* v) { + uv_close(h, NULL); /* Close this handle */ + if (!uv_loop_alive(h->loop)) /* Everything closed */ + uv_stop(h->loop); /* Stop the loop, pn_proactor_destroy() can return */ +} + +void pn_proactor_free(pn_proactor_t *p) { + uv_walk(&p->loop, on_stopping, NULL); /* Close all handles */ + uv_run(&p->loop, UV_RUN_DEFAULT); /* Run till stop, all handles closed */ + uv_loop_close(&p->loop); + uv_mutex_destroy(&p->lock); + uv_cond_destroy(&p->cond); + pn_collector_free(p->collector); + free(p); +} + +static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { + pn_listener_t *l = batch_listener(batch); + pn_event_t *handled = pn_collector_prev(l->collector); + if (handled && pn_event_type(handled) == PN_LISTENER_CLOSE) { + owner_to_leader(&l->psocket, leader_close); /* Close event handled, do close */ + } + return pn_collector_next(l->collector); +} + +static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { + return pn_collector_next(batch_proactor(batch)->collector); +} + +static void pn_listener_free(pn_listener_t *l) { + if (l) { + if (!l->collector) pn_collector_free(l->collector); + if (!l->condition) pn_condition_free(l->condition); + if (!l->attachments) pn_free(l->attachments); + free(l); + } +} + +pn_listener_t *pn_listener() { + pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t)); + if (l) { + l->batch.next_event = listener_batch_next; + l->collector = pn_collector(); + l->condition = pn_condition(); + l->attachments = pn_record(); + if (!l->condition || !l->collector || !l->attachments) { + pn_listener_free(l); + return NULL; + } + } + return l; +} + +void pn_listener_close(pn_listener_t* l) { + wakeup(&l->psocket, leader_close); +} + +pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { + return l ? l->psocket.proactor : NULL; +} + +pn_condition_t* pn_listener_condition(pn_listener_t* l) { + return l->condition; +} + +void *pn_listener_get_context(pn_listener_t *l) { + return l->context; +} + +void pn_listener_set_context(pn_listener_t *l, void *context) { + l->context = context; +} + +pn_record_t *pn_listener_attachments(pn_listener_t *l) { + return l->attachments; +} + +int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { + if (l->accepting) { + return PN_STATE_ERR; /* Only one at a time */ + } + l->accepting = new_pconnection_t( + l->psocket.proactor, c, true, l->psocket.host, l->psocket.port); + if (!l->accepting) { + return UV_ENOMEM; + } + owner_to_leader(&l->psocket, leader_accept); + return 0; +} + --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
