PROTON-1344: proactor listener/conneciton configuration Dropped extra bytes mechanism, may be re-introduced later.
Added context and attachments to pn_listener_t, consistent with pn_connection_t Configure connection/listener before calling proactor connect/listen. Added PN_LISTENER_ACCEPT event so accepted connections can be configured. Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/aadfcbbb Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/aadfcbbb Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/aadfcbbb Branch: refs/heads/master Commit: aadfcbbb7a7b75eb442df4d4de8ef97eb2e7a754 Parents: f2c8a3a Author: Alan Conway <[email protected]> Authored: Thu Nov 17 00:14:12 2016 -0500 Committer: Alan Conway <[email protected]> Committed: Thu Nov 17 11:22:50 2016 -0500 ---------------------------------------------------------------------- examples/c/proactor/broker.c | 33 ++-- examples/c/proactor/libuv_proactor.c | 246 +++++++++++++++++------------- examples/c/proactor/receive.c | 2 +- examples/c/proactor/send.c | 2 +- proton-c/include/proton/connection.h | 12 -- proton-c/include/proton/event.h | 12 +- proton-c/include/proton/extra.h | 69 --------- proton-c/include/proton/listener.h | 43 +++++- proton-c/include/proton/proactor.h | 32 ++-- proton-c/src/core/engine.c | 16 +- 10 files changed, 226 insertions(+), 241 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/broker.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/broker.c b/examples/c/proactor/broker.c index 66381fc..ca52336 100644 --- a/examples/c/proactor/broker.c +++ b/examples/c/proactor/broker.c @@ -158,6 +158,15 @@ typedef struct broker_data_t { bool check_queues; /* Check senders on the connection for available data in queues. */ } broker_data_t; +/* Use the context pointer as a boolean flag to indicate we need to check queues */ +void pn_connection_set_check_queues(pn_connection_t *c, bool check) { + pn_connection_set_context(c, (void*)check); +} + +bool pn_connection_get_check_queues(pn_connection_t *c) { + return (bool)pn_connection_get_context(c); +} + /* Put a message on the queue, called in receiver dispatch loop. If the queue was previously empty, notify waiting senders. */ @@ -168,8 +177,7 @@ static void queue_receive(pn_proactor_t *d, queue_t *q, pn_rwbytes_t m) { if (q->messages.len == 1) { /* Was empty, notify waiting connections */ for (size_t i = 0; i < q->waiting.len; ++i) { pn_connection_t *c = q->waiting.data[i]; - broker_data_t *bd = (broker_data_t*)pn_connection_get_extra(c).start; - bd->check_queues = true; + pn_connection_set_check_queues(c, true); pn_connection_wake(c); /* Wake the connection */ } q->waiting.len = 0; @@ -215,7 +223,6 @@ queue_t* queues_get(queues_t *qs, const char* name) { /* The broker implementation */ typedef struct broker_t { pn_proactor_t *proactor; - pn_listener_t *listener; queues_t queues; const char *container_id; /* AMQP container-id */ size_t threads; @@ -226,7 +233,6 @@ typedef struct broker_t { void broker_init(broker_t *b, const char *container_id, size_t threads, pn_millis_t heartbeat) { memset(b, 0, sizeof(*b)); b->proactor = pn_proactor(); - b->listener = NULL; queues_init(&b->queues); b->container_id = container_id; b->threads = threads; @@ -300,10 +306,14 @@ static void handle(broker_t* b, pn_event_t* e) { switch (pn_event_type(e)) { - case PN_CONNECTION_INIT: { + case PN_LISTENER_ACCEPT: + pn_listener_accept(pn_event_listener(e), pn_connection()); + break; + + case PN_CONNECTION_INIT: pn_connection_set_container(c, b->container_id); break; - } + case PN_CONNECTION_BOUND: { /* Turn off security */ pn_transport_t *t = pn_connection_transport(c); @@ -316,9 +326,8 @@ static void handle(broker_t* b, pn_event_t* e) { break; } case PN_CONNECTION_WAKE: { - broker_data_t *bd = (broker_data_t*)pn_connection_get_extra(c).start; - if (bd->check_queues) { - bd->check_queues = false; + if (pn_connection_get_check_queues(c)) { + pn_connection_set_check_queues(c, false); int flags = PN_LOCAL_ACTIVE&PN_REMOTE_ACTIVE; for (pn_link_t *l = pn_link_head(c, flags); l != NULL; l = pn_link_next(l, flags)) link_send(b, l); @@ -456,11 +465,7 @@ int main(int argc, char **argv) { */ const char *host = url ? pn_url_get_host(url) : "::"; const char *port = url ? pn_url_get_port(url) : "amqp"; - - /* Initial broker_data value copied to each accepted connection */ - broker_data_t bd = { false }; - b.listener = pn_proactor_listen(b.proactor, host, port, 16, - pn_bytes(sizeof(bd), (char*)&bd)); + pn_proactor_listen(b.proactor, pn_listener(), host, port, 16); printf("listening on '%s:%s' %zd threads\n", host, port, b.threads); if (url) pn_url_free(url); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/libuv_proactor.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/libuv_proactor.c b/examples/c/proactor/libuv_proactor.c index a26c311..9770166 100644 --- a/examples/c/proactor/libuv_proactor.c +++ b/examples/c/proactor/libuv_proactor.c @@ -24,7 +24,6 @@ #include <proton/condition.h> #include <proton/connection_driver.h> #include <proton/engine.h> -#include <proton/extra.h> #include <proton/message.h> #include <proton/object.h> #include <proton/proactor.h> @@ -142,13 +141,15 @@ struct pn_listener_t { psocket_t psocket; /* Only used by owner thread */ + pconnection_t *accepting; /* accept in progress */ pn_condition_t *condition; pn_collector_t *collector; pn_event_batch_t batch; + pn_record_t *attachments; + void *context; size_t backlog; }; -PN_EXTRA_DECLARE(pn_listener_t); typedef struct queue { psocket_t *front, *back; } queue; @@ -222,24 +223,26 @@ static void to_leader(psocket_t *ps) { /* Detach from IO and put ps on the worker queue */ static void leader_to_worker(psocket_t *ps) { - pconnection_t *pc = as_pconnection_t(ps); - /* Don't detach if there are no events yet. */ - if (pc && pn_connection_driver_has_event(&pc->driver)) { - if (pc->writing) { - pc->writing = 0; - uv_cancel((uv_req_t*)&pc->write); - } - if (pc->reading) { - pc->reading = false; - uv_read_stop((uv_stream_t*)&pc->psocket.tcp); - } - if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { - uv_timer_stop(&pc->timer); + if (ps->is_conn) { + pconnection_t *pc = as_pconnection_t(ps); + /* Don't detach if there are no events yet. */ + if (pn_connection_driver_has_event(&pc->driver)) { + if (pc->writing) { + pc->writing = 0; + uv_cancel((uv_req_t*)&pc->write); + } + if (pc->reading) { + pc->reading = false; + uv_read_stop((uv_stream_t*)&pc->psocket.tcp); + } + if (pc->timer.data && !uv_is_closing((uv_handle_t*)&pc->timer)) { + uv_timer_stop(&pc->timer); + } } + } else { + pn_listener_t *l = as_listener(ps); + uv_read_stop((uv_stream_t*)&l->psocket.tcp); } - - /* Nothing to do for a listener, on_accept doesn't touch worker state. */ - uv_mutex_lock(&ps->proactor->lock); push_lh(&ps->proactor->worker_q, ps); uv_mutex_unlock(&ps->proactor->lock); @@ -275,15 +278,12 @@ static void worker_requeue(psocket_t* ps) { uv_mutex_unlock(&ps->proactor->lock); } -static pconnection_t *new_pconnection_t(pn_proactor_t *p, bool server, const char *host, const char *port, pn_bytes_t extra) { +static pconnection_t *new_pconnection_t(pn_proactor_t *p, pn_connection_t *c, bool server, const char *host, const char *port) { pconnection_t *pc = (pconnection_t*)calloc(1, sizeof(*pc)); if (!pc) return NULL; - if (pn_connection_driver_init(&pc->driver, pn_connection_with_extra(extra.size), NULL) != 0) { + if (pn_connection_driver_init(&pc->driver, c, NULL) != 0) { return NULL; } - if (extra.start && extra.size) { - memcpy(pn_connection_get_extra(pc->driver.connection).start, extra.start, extra.size); - } psocket_init(&pc->psocket, p, true, host, port); if (server) { pn_transport_set_server(pc->driver.transport); @@ -312,26 +312,6 @@ static inline pconnection_t *batch_pconnection(pn_event_batch_t *batch) { return d ? (pconnection_t*)((char*)d - offsetof(pconnection_t, driver)) : NULL; } -pn_listener_t *new_listener(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) { - pn_listener_t *l = (pn_listener_t*)calloc(1, PN_EXTRA_SIZEOF(pn_listener_t, extra.size)); - if (!l) { - return NULL; - } - l->collector = pn_collector(); - if (!l->collector) { - free(l); - return NULL; - } - if (extra.start && extra.size) { - memcpy(pn_listener_get_extra(l).start, extra.start, extra.size); - } - psocket_init(&l->psocket, p, false, host, port); - l->condition = pn_condition(); - l->batch.next_event = listener_batch_next; - l->backlog = backlog; - return l; -} - static void leader_count(pn_proactor_t *p, int change) { uv_mutex_lock(&p->lock); p->count += change; @@ -456,24 +436,23 @@ static void on_connect(uv_connect_t *connect, int err) { } static void on_accept(uv_stream_t* server, int err) { - pn_listener_t* l = (pn_listener_t*)server->data; - if (!err) { - pn_rwbytes_t v = pn_listener_get_extra(l); - pconnection_t *pc = new_pconnection_t(l->psocket.proactor, true, - fixstr(l->psocket.host), - fixstr(l->psocket.port), - pn_bytes(v.size, v.start)); - if (pc) { - int err2 = leader_init(&pc->psocket); - if (!err2) err2 = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); - leader_connect_accept(pc, err2, "on accept"); - } else { - err = UV_ENOMEM; - } - } + pn_listener_t *l = (pn_listener_t*) server->data; if (err) { leader_error(&l->psocket, err, "on accept"); } + pn_collector_put(l->collector, pn_listener__class(), l, PN_LISTENER_ACCEPT); + leader_to_worker(&l->psocket); /* Let user call pn_listener_accept */ +} + +static void leader_accept(psocket_t *ps) { + pn_listener_t * l = as_listener(ps); + pconnection_t *pc = l->accepting; + l->accepting = NULL; + if (pc) { + int err = leader_init(&pc->psocket); + if (!err) err = uv_accept((uv_stream_t*)&l->psocket.tcp, (uv_stream_t*)&pc->psocket.tcp); + leader_connect_accept(pc, err, "on accept"); + } } static int leader_resolve(psocket_t *ps, uv_getaddrinfo_t *info, bool server) { @@ -570,31 +549,39 @@ static void alloc_read_buffer(uv_handle_t* stream, size_t size, uv_buf_t* buf) { } static void leader_rewatch(psocket_t *ps) { - pconnection_t *pc = as_pconnection_t(ps); - - if (pc->timer.data) { /* uv-initialized */ - on_tick(&pc->timer); /* Re-enable ticks if required */ - } - pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); - pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); - - /* Ticks and checking buffers can generate events, process before proceeding */ - if (pn_connection_driver_has_event(&pc->driver)) { - leader_to_worker(ps); - } else { /* Re-watch for IO */ - if (wbuf.size > 0 && !pc->writing) { - pc->writing = wbuf.size; - uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); - pc->write.data = ps; - uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); - } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { - pc->shutdown.data = ps; - uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown); + int err = 0; + if (ps->is_conn) { + pconnection_t *pc = as_pconnection_t(ps); + if (pc->timer.data) { /* uv-initialized */ + on_tick(&pc->timer); /* Re-enable ticks if required */ } - if (rbuf.size > 0 && !pc->reading) { - pc->reading = true; - uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); + pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(&pc->driver); + pn_bytes_t wbuf = pn_connection_driver_write_buffer(&pc->driver); + + /* Ticks and checking buffers can generate events, process before proceeding */ + if (pn_connection_driver_has_event(&pc->driver)) { + leader_to_worker(ps); + } else { /* Re-watch for IO */ + if (wbuf.size > 0 && !pc->writing) { + pc->writing = wbuf.size; + uv_buf_t buf = uv_buf_init((char*)wbuf.start, wbuf.size); + pc->write.data = ps; + uv_write(&pc->write, (uv_stream_t*)&pc->psocket.tcp, &buf, 1, on_write); + } else if (wbuf.size == 0 && pn_connection_driver_write_closed(&pc->driver)) { + pc->shutdown.data = ps; + uv_shutdown(&pc->shutdown, (uv_stream_t*)&pc->psocket.tcp, on_shutdown); + } + if (rbuf.size > 0 && !pc->reading) { + pc->reading = true; + err = uv_read_start((uv_stream_t*)&pc->psocket.tcp, alloc_read_buffer, on_read); + } } + } else { + pn_listener_t *l = as_listener(ps); + err = uv_listen((uv_stream_t*)&l->psocket.tcp, l->backlog, on_accept); + } + if (err) { + leader_error(ps, err, "rewatch"); } } @@ -668,6 +655,11 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { } return; } + pn_listener_t *l = batch_listener(batch); + if (l) { + owner_to_leader(&l->psocket, leader_rewatch); + return; + } pn_proactor_t *bp = batch_proactor(batch); if (bp == p) { uv_mutex_lock(&p->lock); @@ -676,7 +668,6 @@ void pn_proactor_done(pn_proactor_t *p, pn_event_batch_t *batch) { uv_mutex_unlock(&p->lock); return; } - /* Nothing extra to do for listener, it is always in the UV loop. */ } /* Run follower/leader loop till we can return an event and be a worker */ @@ -742,8 +733,8 @@ void pn_proactor_set_timeout(pn_proactor_t *p, pn_millis_t t) { uv_mutex_unlock(&p->lock); } -int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn_bytes_t extra) { - pconnection_t *pc = new_pconnection_t(p, false, host, port, extra); +int pn_proactor_connect(pn_proactor_t *p, pn_connection_t *c, const char *host, const char *port) { + pconnection_t *pc = new_pconnection_t(p, c, false, host, port); if (!pc) { return PN_OUT_OF_MEMORY; } @@ -752,12 +743,12 @@ int pn_proactor_connect(pn_proactor_t *p, const char *host, const char *port, pn return 0; } -pn_rwbytes_t pn_listener_get_extra(pn_listener_t *l) { return PN_EXTRA_GET(pn_listener_t, l); } - -pn_listener_t *pn_proactor_listen(pn_proactor_t *p, const char *host, const char *port, int backlog, pn_bytes_t extra) { - pn_listener_t *l = new_listener(p, host, port, backlog, extra); - if (l) owner_to_leader(&l->psocket, leader_listen); - return l; +int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *l, const char *host, const char *port, int backlog) +{ + psocket_init(&l->psocket, p, false, host, port); + l->backlog = backlog; + owner_to_leader(&l->psocket, leader_listen); + return 0; } pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { @@ -765,10 +756,6 @@ pn_proactor_t *pn_connection_proactor(pn_connection_t* c) { return pc ? pc->psocket.proactor : NULL; } -pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { - return l ? l->psocket.proactor : NULL; -} - void leader_wake_connection(psocket_t *ps) { pconnection_t *pc = as_pconnection_t(ps); pn_connection_t *c = pc->driver.connection; @@ -780,15 +767,6 @@ void pn_connection_wake(pn_connection_t* c) { wakeup(&get_pconnection_t(c)->psocket, leader_wake_connection); } -void pn_listener_close(pn_listener_t* l) { - wakeup(&l->psocket, leader_close); -} - -/* Only called when condition is closed by error. */ -pn_condition_t* pn_listener_condition(pn_listener_t* l) { - return l->condition; -} - pn_proactor_t *pn_proactor() { pn_proactor_t *p = (pn_proactor_t*)calloc(1, sizeof(*p)); p->collector = pn_collector(); @@ -831,3 +809,65 @@ static pn_event_t *listener_batch_next(pn_event_batch_t *batch) { static pn_event_t *proactor_batch_next(pn_event_batch_t *batch) { return pn_collector_next(batch_proactor(batch)->collector); } + +pn_listener_t *pn_listener() { + pn_listener_t *l = (pn_listener_t*)calloc(1, sizeof(pn_listener_t)); + if (l) { + l->batch.next_event = listener_batch_next; + l->collector = pn_collector(); + l->condition = pn_condition(); + l->attachments = pn_record(); + if (!l->condition || !l->collector || !l->attachments) { + pn_listener_free(l); + return NULL; + } + } + return l; +} + +void pn_listener_free(pn_listener_t *l) { + if (l) { + if (!l->collector) pn_collector_free(l->collector); + if (!l->condition) pn_condition_free(l->condition); + if (!l->attachments) pn_free(l->attachments); + free(l); + } +} + +void pn_listener_close(pn_listener_t* l) { + wakeup(&l->psocket, leader_close); +} + +pn_proactor_t *pn_listener_proactor(pn_listener_t* l) { + return l ? l->psocket.proactor : NULL; +} + +pn_condition_t* pn_listener_condition(pn_listener_t* l) { + return l->condition; +} + +void *pn_listener_get_context(pn_listener_t *l) { + return l->context; +} + +void pn_listener_set_context(pn_listener_t *l, void *context) { + l->context = context; +} + +pn_record_t *pn_listener_attachments(pn_listener_t *l) { + return l->attachments; +} + +int pn_listener_accept(pn_listener_t *l, pn_connection_t *c) { + if (l->accepting) { + return PN_STATE_ERR; /* Only one at a time */ + } + l->accepting = new_pconnection_t( + l->psocket.proactor, c, true, l->psocket.host, l->psocket.port); + if (!l->accepting) { + return UV_ENOMEM; + } + owner_to_leader(&l->psocket, leader_accept); + return 0; +} + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/receive.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/receive.c b/examples/c/proactor/receive.c index 88e3456..b8edcd6 100644 --- a/examples/c/proactor/receive.c +++ b/examples/c/proactor/receive.c @@ -187,7 +187,7 @@ int main(int argc, char **argv) { /* Create the proactor and connect */ app.proactor = pn_proactor(); - pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null); + pn_proactor_connect(app.proactor, pn_connection(), host, port); if (url) pn_url_free(url); do { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/examples/c/proactor/send.c ---------------------------------------------------------------------- diff --git a/examples/c/proactor/send.c b/examples/c/proactor/send.c index 42facb0..d611b3d 100644 --- a/examples/c/proactor/send.c +++ b/examples/c/proactor/send.c @@ -216,7 +216,7 @@ int main(int argc, char **argv) { /* Create the proactor and connect */ app.proactor = pn_proactor(); - pn_proactor_connect(app.proactor, host, port, pn_rwbytes_null); + pn_proactor_connect(app.proactor, pn_connection(), host, port); if (url) pn_url_free(url); do { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/connection.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h index 70fad73..5b966cd 100644 --- a/proton-c/include/proton/connection.h +++ b/proton-c/include/proton/connection.h @@ -156,7 +156,6 @@ PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection); /** - * @deprecated * Get the application context that is associated with a connection * object. * @@ -169,7 +168,6 @@ PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection); PN_EXTERN void *pn_connection_get_context(pn_connection_t *connection); /** - * @deprecated * Set a new application context for a connection object. * * The application context for a connection object may be retrieved @@ -485,16 +483,6 @@ PN_EXTERN pn_data_t *pn_connection_remote_properties(pn_connection_t *connection */ PN_EXTERN pn_transport_t *pn_connection_transport(pn_connection_t *connection); -/** - * Create a connection with `size` bytes of extra aligned storage in the same heap block. - */ -PN_EXTERN pn_connection_t* pn_connection_with_extra(size_t size); - -/** - * Get the start and size of extra storage allocated by pn_connection_extra() - */ -PN_EXTERN pn_rwbytes_t pn_connection_get_extra(pn_connection_t *connection); - /** @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/event.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h index 31d4bdd..7793f1c 100644 --- a/proton-c/include/proton/event.h +++ b/proton-c/include/proton/event.h @@ -323,19 +323,25 @@ typedef enum { PN_CONNECTION_WAKE, /** - * pn_listener_close() was called or an error occurred, see pn_listener_condition() + * Indicates the listener is ready to call pn_listener_accept() + * Events of this type point to the @ref pn_listener_t. + */ + PN_LISTENER_ACCEPT, + + /** + * Indicates the listener has closed. pn_listener_condition() provides error information. * Events of this type point to the @ref pn_listener_t. */ PN_LISTENER_CLOSE, /** - * pn_proactor_interrupt() was called to interrupt a proactor thread + * Indicates pn_proactor_interrupt() was called to interrupt a proactor thread * Events of this type point to the @ref pn_proactor_t. */ PN_PROACTOR_INTERRUPT, /** - * pn_proactor_set_timeout() time limit expired. + * Timeout set by pn_proactor_set_timeout() time limit expired. * Events of this type point to the @ref pn_proactor_t. */ PN_PROACTOR_TIMEOUT, http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/extra.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/extra.h b/proton-c/include/proton/extra.h deleted file mode 100644 index ea2e1ef..0000000 --- a/proton-c/include/proton/extra.h +++ /dev/null @@ -1,69 +0,0 @@ -#ifndef EXTRA_H -#define EXTRA_H - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <proton/type_compat.h> -#include <proton/types.h> -#include <stddef.h> -#include <stdlib.h> - -/** - * @cond INTERNAL - * Support for allocating extra aligned memory after a type. - */ - -#ifdef __cplusplus -extern "C" { -#endif - -/** - * extra_t contains a size and is maximally aligned so the memory immediately - * after it can store any type of value. - */ -typedef union pn_extra_t { - size_t size; -#if __STDC_VERSION__ >= 201112 - max_align_t max; -#else -/* Not standard but fairly safe */ - uint64_t i; - long double d; - void *v; - void (*fp)(void); -#endif -} pn_extra_t; - -static inline pn_rwbytes_t pn_extra_rwbytes(pn_extra_t *x) { - return pn_rwbytes(x->size, (char*)(x+1)); -} - -/* Declare private helper struct for T */ -#define PN_EXTRA_DECLARE(T) typedef struct T##__extra { T base; pn_extra_t extra; } T##__extra -#define PN_EXTRA_SIZEOF(T, N) (sizeof(T##__extra)+(N)) -#define PN_EXTRA_GET(T, P) pn_extra_rwbytes(&((T##__extra*)(P))->extra) - -#ifdef __cplusplus -} -#endif - -/** @endcond */ - -#endif // EXTRA_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/listener.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h index f55479b..5e60649 100644 --- a/proton-c/include/proton/listener.h +++ b/proton-c/include/proton/listener.h @@ -40,30 +40,57 @@ typedef struct pn_proactor_t pn_proactor_t; typedef struct pn_condition_t pn_condition_t; /** - * Listener accepts connections, see pn_proactor_listen() + * A listener accepts connections. */ typedef struct pn_listener_t pn_listener_t; /** - * The proactor that created the listener. + * Create a listener. */ -pn_proactor_t *pn_listener_proactor(pn_listener_t *c); +PN_EXTERN pn_listener_t *pn_listener(void); + +/** + * Free a listener + */ +PN_EXTERN void pn_listener_free(pn_listener_t*); + +/** + * Asynchronously accept a connection using the listener. + * + * @param[in] connection the listener takes ownership, do not free. + */ +PN_EXTERN int pn_listener_accept(pn_listener_t*, pn_connection_t *connection); /** * Get the error condition for a listener. */ -pn_condition_t *pn_listener_condition(pn_listener_t *l); +PN_EXTERN pn_condition_t *pn_listener_condition(pn_listener_t *l); /** - * Get the user-provided value associated with the listener in pn_proactor_listen() - * The start address is aligned so you can cast it to any type. + * Get the application context that is associated with a listener. */ -pn_rwbytes_t pn_listener_get_extra(pn_listener_t*); +PN_EXTERN void *pn_listener_get_context(pn_listener_t *listener); + +/** + * Set a new application context for a listener. + */ +PN_EXTERN void pn_listener_set_context(pn_listener_t *listener, void *context); + +/** + * Get the attachments that are associated with a listener object. + */ +PN_EXTERN pn_record_t *pn_listener_attachments(pn_listener_t *listener); /** * Close the listener (thread safe). */ -void pn_listener_close(pn_listener_t *l); +PN_EXTERN void pn_listener_close(pn_listener_t *l); + +/** + * The proactor associated with a listener. + */ +PN_EXTERN pn_proactor_t *pn_listener_proactor(pn_listener_t *c); + /** *@} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h index e23a24f..9d39c9c 100644 --- a/proton-c/include/proton/proactor.h +++ b/proton-c/include/proton/proactor.h @@ -68,30 +68,30 @@ pn_proactor_t *pn_proactor(void); void pn_proactor_free(pn_proactor_t*); /** - * Asynchronous connect: a connection and transport will be created, the - * relevant events will be returned by pn_proactor_wait() + * Connect connection to host/port. Connection and transport events will be + * returned by pn_proactor_wait() * - * Errors are indicated by PN_TRANSPORT_ERROR/PN_TRANSPORT_CLOSE events. + * @param[in] connection the proactor takes ownership do not free. + * @param[in] host the address to listen on + * @param[in] port the port to connect to * - * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref - * pn_rwbytes_null for nothing. - * - * @return error if the connect cannot be initiated e.g. an allocation failure. - * IO errors will be returned as transport events via pn_proactor_wait() + * @return error on immediate error, e.g. an allocation failure. + * Other errors are indicated by connection or transport events via pn_proactor_wait() */ -int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_bytes_t extra); +int pn_proactor_connect(pn_proactor_t*, pn_connection_t *connection, const char *host, const char *port); /** - * Asynchronous listen: start listening, connections will be returned by pn_proactor_wait() - * An error are indicated by PN_LISTENER_ERROR event. + * Start listening with listener. + * pn_proactor_wait() will return a PN_LISTENER_ACCEPT event when a connection can be accepted. * - * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref - * pn_rwbytes_null for nothing. + * @param[in] listener proactor takes ownership of listener, do not free. + * @param[in] host the address to listen on + * @param[in] port the port to listen on * - * @return error if the connect cannot be initiated e.g. an allocation failure. - * IO errors will be returned as transport events via pn_proactor_wait() + * @return error on immediate error, e.g. an allocation failure. + * Other errors are indicated by pn_listener_condition() on the PN_LISTENER_CLOSE event. */ -pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra); +int pn_proactor_listen(pn_proactor_t *p, pn_listener_t *listener, const char *host, const char *port, int backlog); /** * Wait for events to handle. Call pn_proactor_done() after handling events. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/aadfcbbb/proton-c/src/core/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/engine.c b/proton-c/src/core/engine.c index 2836a43..99d311b 100644 --- a/proton-c/src/core/engine.c +++ b/proton-c/src/core/engine.c @@ -32,9 +32,6 @@ #include "platform/platform_fmt.h" #include "transport.h" -#include <proton/extra.h> - - static void pni_session_bound(pn_session_t *ssn); static void pni_link_bound(pn_link_t *link); @@ -511,15 +508,10 @@ static void pn_connection_finalize(void *object) #define pn_connection_compare NULL #define pn_connection_inspect NULL -PN_EXTRA_DECLARE(pn_connection_t); - -pn_rwbytes_t pn_connection_get_extra(pn_connection_t *c) { return PN_EXTRA_GET(pn_connection_t, c); } - -pn_connection_t *pn_connection_with_extra(size_t extra) +pn_connection_t *pn_connection() { static const pn_class_t clazz = PN_CLASS(pn_connection); - size_t size = PN_EXTRA_SIZEOF(pn_connection_t, extra); - pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, size); + pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t)); if (!conn) return NULL; conn->endpoint_head = NULL; @@ -548,10 +540,6 @@ pn_connection_t *pn_connection_with_extra(size_t extra) return conn; } -pn_connection_t *pn_connection(void) { - return pn_connection_with_extra(0); -} - static const pn_event_type_t endpoint_init_event_map[] = { PN_CONNECTION_INIT, /* CONNECTION */ PN_SESSION_INIT, /* SESSION */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
