initial commit of C reactor
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/ebe4e3d3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/ebe4e3d3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/ebe4e3d3 Branch: refs/heads/master Commit: ebe4e3d3676d8fcceb3016d7e0b2abb2deff7927 Parents: 86e08ba Author: Rafael Schloming <[email protected]> Authored: Sun Jan 4 22:38:28 2015 -0500 Committer: Rafael Schloming <[email protected]> Committed: Sun Jan 4 22:38:28 2015 -0500 ---------------------------------------------------------------------- proton-c/CMakeLists.txt | 8 + proton-c/include/proton/cid.h | 3 + proton-c/include/proton/event.h | 11 + proton-c/include/proton/handlers.h | 59 +++++ proton-c/include/proton/reactor.h | 83 ++++++++ proton-c/src/events/event.c | 4 + proton-c/src/handlers/flowcontroller.c | 65 ++++++ proton-c/src/handlers/handshaker.c | 103 +++++++++ proton-c/src/messenger/messenger.c | 2 + proton-c/src/reactor/acceptor.c | 80 +++++++ proton-c/src/reactor/connection.c | 207 ++++++++++++++++++ proton-c/src/reactor/handler.c | 103 +++++++++ proton-c/src/reactor/reactor.c | 216 +++++++++++++++++++ proton-c/src/tests/CMakeLists.txt | 1 + proton-c/src/tests/reactor.c | 320 ++++++++++++++++++++++++++++ 15 files changed, 1265 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/CMakeLists.txt b/proton-c/CMakeLists.txt index 7e58b4c..35df99f 100644 --- a/proton-c/CMakeLists.txt +++ b/proton-c/CMakeLists.txt @@ -304,6 +304,14 @@ set (qpid-proton-core src/message/message.c src/sasl/sasl.c + src/reactor/reactor.c + src/reactor/handler.c + src/reactor/connection.c + src/reactor/acceptor.c + + src/handlers/handshaker.c + src/handlers/flowcontroller.c + src/messenger/messenger.c src/messenger/subscription.c src/messenger/store.c http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/include/proton/cid.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cid.h b/proton-c/include/proton/cid.h index 5a06308..ca6172f 100644 --- a/proton-c/include/proton/cid.h +++ b/proton-c/include/proton/cid.h @@ -47,6 +47,9 @@ typedef enum { CID_pn_message, + CID_pn_reactor, + CID_pn_handler, + CID_pn_io, CID_pn_selector, CID_pn_selectable, http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/include/proton/event.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h index c843fee..28c3313 100644 --- a/proton-c/include/proton/event.h +++ b/proton-c/include/proton/event.h @@ -24,6 +24,7 @@ #include <proton/import_export.h> #include <proton/type_compat.h> +#include <proton/object.h> #include <stddef.h> #include <sys/types.h> @@ -87,6 +88,16 @@ typedef enum { PN_EVENT_NONE = 0, /** + * A reactor has been started. Events of this type point to the reactor. + */ + PN_REACTOR_INIT, + + /** + * A reactor has been stopped. Events of this type point to the reactor. + */ + PN_REACTOR_FINAL, + + /** * The connection has been created. This is the first event that * will ever be issued for a connection. Events of this type point * to the relevant connection. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/include/proton/handlers.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/handlers.h b/proton-c/include/proton/handlers.h new file mode 100644 index 0000000..d2d732b --- /dev/null +++ b/proton-c/include/proton/handlers.h @@ -0,0 +1,59 @@ +#ifndef PROTON_HANDLERS_H +#define PROTON_HANDLERS_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/import_export.h> +#include <proton/type_compat.h> +#include <proton/reactor.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @file + * + * Reactor API for proton. + * + * @defgroup handlers Handlers + * @ingroup handlers + * @{ + */ + +typedef struct pn_handshaker_t pn_handshaker_t; +typedef struct pn_flowcontroller_t pn_flowcontroller_t; + +PN_EXTERN pn_handshaker_t *pn_handshaker(void); +PN_EXTERN pn_handler_t *pn_handshaker_handler(pn_handshaker_t *handshaker); + +PN_EXTERN pn_flowcontroller_t *pn_flowcontroller(int window); +PN_EXTERN pn_handler_t *pn_flowcontroller_handler(pn_flowcontroller_t *flowcontroller); + +/** @} + */ + +#ifdef __cplusplus +} +#endif + +#endif /* handlers.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/include/proton/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h new file mode 100644 index 0000000..e4cd84a --- /dev/null +++ b/proton-c/include/proton/reactor.h @@ -0,0 +1,83 @@ +#ifndef PROTON_REACTOR_H +#define PROTON_REACTOR_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/import_export.h> +#include <proton/type_compat.h> +#include <proton/event.h> +#include <proton/selectable.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @file + * + * Reactor API for proton. + * + * @defgroup reactor Reactor + * @ingroup reactor + * @{ + */ + +typedef struct pn_handler_t pn_handler_t; +typedef struct pn_reactor_t pn_reactor_t; +typedef struct pn_acceptor_t pn_acceptor_t; + +PN_EXTERN pn_handler_t *pn_handler(void (*dispatch)(pn_handler_t *, pn_event_t *)); +PN_EXTERN pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *, pn_event_t *), size_t size, + void (*finalize)(pn_handler_t *)); +PN_EXTERN void pn_handler_free(pn_handler_t *handler); +PN_EXTERN void *pn_handler_mem(pn_handler_t *handler); +PN_EXTERN pn_handler_t *pn_handler_cast(void *mem); +PN_EXTERN void pn_handler_add(pn_handler_t *handler, pn_handler_t *child); +PN_EXTERN void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event); + +PN_EXTERN pn_reactor_t *pn_reactor(void); +PN_EXTERN void pn_reactor_free(pn_reactor_t *reactor); +PN_EXTERN pn_collector_t *pn_reactor_collector(pn_reactor_t *reactor); +PN_EXTERN pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor); +PN_EXTERN pn_io_t *pn_reactor_io(pn_reactor_t *reactor); +PN_EXTERN pn_selector_t *pn_reactor_selector(pn_reactor_t *reactor); +PN_EXTERN pn_list_t *pn_reactor_children(pn_reactor_t *reactor); +PN_EXTERN pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor); +PN_EXTERN void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable); +PN_EXTERN pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, + pn_handler_t *handler); +PN_EXTERN pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler); +PN_EXTERN void pn_reactor_run(pn_reactor_t *reactor); + +PN_EXTERN void pn_acceptor_close(pn_reactor_t *reactor, pn_acceptor_t *acceptor); + +PN_EXTERN extern void *pni_handler; +#define PN_HANDLER ((pn_handle_t) &pni_handler) + +/** @} + */ + +#ifdef __cplusplus +} +#endif + +#endif /* reactor.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/events/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c index 05518be..f90c2cd 100644 --- a/proton-c/src/events/event.c +++ b/proton-c/src/events/event.c @@ -232,6 +232,10 @@ const char *pn_event_type_name(pn_event_type_t type) switch (type) { case PN_EVENT_NONE: return "PN_EVENT_NONE"; + case PN_REACTOR_INIT: + return "PN_REACTOR_INIT"; + case PN_REACTOR_FINAL: + return "PN_REACTOR_FINAL"; case PN_CONNECTION_INIT: return "PN_CONNECTION_INIT"; case PN_CONNECTION_BOUND: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/handlers/flowcontroller.c ---------------------------------------------------------------------- diff --git a/proton-c/src/handlers/flowcontroller.c b/proton-c/src/handlers/flowcontroller.c new file mode 100644 index 0000000..2a164fe --- /dev/null +++ b/proton-c/src/handlers/flowcontroller.c @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/link.h> +#include <proton/handlers.h> +#include <assert.h> + +struct pn_flowcontroller_t { + int window; +}; + +static void pni_topup(pn_link_t *link, int window) { + int delta = window - pn_link_credit(link); + pn_link_flow(link, delta); +} + +static void pn_flowcontroller_dispatch(pn_handler_t *handler, pn_event_t *event) { + pn_flowcontroller_t *fc = (pn_flowcontroller_t *) pn_handler_mem(handler); + int window = fc->window; + + switch (pn_event_type(event)) { + case PN_LINK_LOCAL_OPEN: + case PN_LINK_REMOTE_OPEN: + case PN_LINK_FLOW: + case PN_DELIVERY: + { + pn_link_t *link = pn_event_link(event); + if (pn_link_is_receiver(link)) { + pni_topup(link, window); + } + } + break; + default: + break; + } +} + +pn_flowcontroller_t *pn_flowcontroller(int window) { + pn_handler_t *handler = pn_handler_new(pn_flowcontroller_dispatch, sizeof(pn_flowcontroller_t), NULL); + pn_flowcontroller_t *flowcontroller = (pn_flowcontroller_t *) pn_handler_mem(handler); + flowcontroller->window = window; + return flowcontroller; +} + +pn_handler_t *pn_flowcontroller_handler(pn_flowcontroller_t *flowcontroller) { + return pn_handler_cast(flowcontroller); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/handlers/handshaker.c ---------------------------------------------------------------------- diff --git a/proton-c/src/handlers/handshaker.c b/proton-c/src/handlers/handshaker.c new file mode 100644 index 0000000..647867a --- /dev/null +++ b/proton-c/src/handlers/handshaker.c @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.h> +#include <proton/session.h> +#include <proton/link.h> +#include <proton/handlers.h> +#include <assert.h> + +struct pn_handshaker_t { + pn_map_t *handlers; +}; + +static void pn_handshaker_finalize(pn_handler_t *handler) { + pn_handshaker_t *handshaker = (pn_handshaker_t *) pn_handler_mem(handler); + pn_free(handshaker->handlers); +} + +static void pn_handshaker_dispatch(pn_handler_t *handler, pn_event_t *event) { + switch (pn_event_type(event)) { + case PN_CONNECTION_REMOTE_OPEN: + { + pn_connection_t *conn = pn_event_connection(event); + if (pn_connection_state(conn) & PN_LOCAL_UNINIT) { + pn_connection_open(conn); + } + } + break; + case PN_SESSION_REMOTE_OPEN: + { + pn_session_t *ssn = pn_event_session(event); + if (pn_session_state(ssn) & PN_LOCAL_UNINIT) { + pn_session_open(ssn); + } + } + break; + case PN_LINK_REMOTE_OPEN: + { + pn_link_t *link = pn_event_link(event); + if (pn_link_state(link) & PN_LOCAL_UNINIT) { + pn_terminus_copy(pn_link_source(link), pn_link_remote_source(link)); + pn_terminus_copy(pn_link_target(link), pn_link_remote_target(link)); + pn_link_open(link); + } + } + break; + case PN_CONNECTION_REMOTE_CLOSE: + { + pn_connection_t *conn = pn_event_connection(event); + if (!(pn_connection_state(conn) & PN_LOCAL_CLOSED)) { + pn_connection_close(conn); + } + } + break; + case PN_SESSION_REMOTE_CLOSE: + { + pn_session_t *ssn = pn_event_session(event); + if (!(pn_session_state(ssn) & PN_LOCAL_CLOSED)) { + pn_session_close(ssn); + } + } + break; + case PN_LINK_REMOTE_CLOSE: + { + pn_link_t *link = pn_event_link(event); + if (!(pn_link_state(link) & PN_LOCAL_CLOSED)) { + pn_link_close(link); + } + } + break; + default: + break; + } +} + +pn_handshaker_t *pn_handshaker(void) { + pn_handler_t *handler = pn_handler_new(pn_handshaker_dispatch, sizeof(pn_handshaker_t), pn_handshaker_finalize); + pn_handshaker_t *handshaker = (pn_handshaker_t *) pn_handler_mem(handler); + handshaker->handlers = NULL; + return handshaker; +} + +pn_handler_t *pn_handshaker_handler(pn_handshaker_t *handshaker) { + return pn_handler_cast(handshaker); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/messenger/messenger.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/messenger.c b/proton-c/src/messenger/messenger.c index 4cdd5e9..92b9003 100644 --- a/proton-c/src/messenger/messenger.c +++ b/proton-c/src/messenger/messenger.c @@ -1306,6 +1306,8 @@ int pn_messenger_process_events(pn_messenger_t *messenger) break; case PN_LINK_FINAL: break; + default: + break; } pn_collector_pop(messenger->collector); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/reactor/acceptor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/acceptor.c b/proton-c/src/reactor/acceptor.c new file mode 100644 index 0000000..8e49772 --- /dev/null +++ b/proton-c/src/reactor/acceptor.c @@ -0,0 +1,80 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/io.h> +#include <proton/reactor.h> +#include <proton/sasl.h> +#include <proton/selector.h> +#include <proton/transport.h> +#include "selectable.h" + +static ssize_t pni_acceptor_capacity(pn_selectable_t *sel) { + return 1; +} + +pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport); + +void pni_acceptor_readable(pn_selectable_t *sel) { + pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); + char name[1024]; + pn_socket_t sock = pn_accept(pn_reactor_io(reactor), pn_selectable_fd(sel), name, 1024); + pn_record_t *record = pn_selectable_attachments(sel); + pn_handler_t *handler = (pn_handler_t *) pn_record_get(record, PN_HANDLER); + if (!handler) { handler = pn_reactor_handler(reactor); } + pn_connection_t *conn = pn_reactor_connection(reactor, handler); + pn_transport_t *trans = pn_transport(); + pn_transport_set_server(trans); + pn_sasl_t *sasl = pn_sasl(trans); + pn_sasl_allow_skip(sasl, true); + pn_sasl_mechanisms(sasl, "ANONYMOUS"); + pn_sasl_done(sasl, PN_SASL_OK); + pn_transport_bind(trans, conn); + pn_reactor_selectable_transport(reactor, sock, trans); + pn_decref(trans); +} + +void pni_acceptor_finalize(pn_selectable_t *sel) { + pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); + pn_close(pn_reactor_io(reactor), pn_selectable_fd(sel)); +} + +pn_acceptor_t *pn_reactor_acceptor(pn_reactor_t *reactor, const char *host, const char *port, pn_handler_t *handler) { + pn_selectable_t *sel = pn_reactor_selectable(reactor); + pn_selectable_set_capacity(sel, pni_acceptor_capacity); + pn_selectable_set_readable(sel, pni_acceptor_readable); + pn_selectable_set_finalize(sel, pni_acceptor_finalize); + pn_socket_t socket = pn_listen(pn_reactor_io(reactor), host, port); + pni_selectable_set_fd(sel, socket); + pni_selectable_set_context(sel, reactor); + pn_record_t *record = pn_selectable_attachments(sel); + pn_record_def(record, PN_HANDLER, PN_OBJECT); + pn_record_set(record, PN_HANDLER, handler); + pn_reactor_update(reactor, sel); + return (pn_acceptor_t *) sel; +} + +void pn_acceptor_close(pn_reactor_t *reactor, pn_acceptor_t *acceptor) { + pn_selectable_t *sel = (pn_selectable_t *) acceptor; + pn_socket_t socket = pn_selectable_fd(sel); + pn_close(pn_reactor_io(reactor), socket); + pni_selectable_set_fd(sel, PN_INVALID_SOCKET); + pn_selector_remove(pn_reactor_selector(reactor), sel); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/reactor/connection.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/connection.c b/proton-c/src/reactor/connection.c new file mode 100644 index 0000000..2757ca5 --- /dev/null +++ b/proton-c/src/reactor/connection.c @@ -0,0 +1,207 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/connection.h> +#include <proton/object.h> +#include <proton/reactor.h> +#include <proton/sasl.h> +#include <proton/transport.h> +#include <assert.h> +#include <stdio.h> +#include <strings.h> +#include "selectable.h" + +// XXX: overloaded for both directions +static void *pni_transportctx = NULL; +#define PN_TRANCTX ((pn_handle_t) &pni_transportctx) + +void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event) { + assert(reactor); + pn_transport_t *transport = pn_event_transport(event); + pn_record_t *record = pn_transport_attachments(transport); + pn_selectable_t *sel = (pn_selectable_t *) pn_record_get(record, PN_TRANCTX); + if (sel && !pn_selectable_is_terminal(sel)) { + pn_reactor_update(reactor, sel); + } +} + +pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport); + +void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event) { + assert(reactor); + assert(event); + pn_connection_t *conn = pn_event_connection(event); + pn_transport_t *transport = pn_transport(); + pn_sasl_t *sasl = pn_sasl(transport); + pn_sasl_mechanisms(sasl, "ANONYMOUS"); + pn_transport_bind(transport, conn); + const char *hostname = pn_connection_get_hostname(conn); + pn_string_t *str = pn_string(hostname); + char *host = pn_string_buffer(str); + const char *port = "5672"; + char *colon = rindex(host, ':'); + if (colon) { + port = colon + 1; + colon[0] = '\0'; + } + pn_socket_t sock = pn_connect(pn_reactor_io(reactor), host, port); + pn_free(str); + pn_reactor_selectable_transport(reactor, sock, transport); + pn_decref(transport); +} + +void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event) { + assert(reactor); + assert(event); + pn_connection_t *conn = pn_event_connection(event); + pn_list_remove(pn_reactor_children(reactor), conn); +} + +static pn_transport_t *pni_transport(pn_selectable_t *sel) { + pn_record_t *record = pn_selectable_attachments(sel); + return (pn_transport_t *) pn_record_get(record, PN_TRANCTX); +} + +static ssize_t pni_connection_capacity(pn_selectable_t *sel) +{ + pn_transport_t *transport = pni_transport(sel); + ssize_t capacity = pn_transport_capacity(transport); + if (capacity < 0) { + if (pn_transport_closed(transport)) { + pni_selectable_set_terminal(sel, true); + } + } + return capacity; +} + +static ssize_t pni_connection_pending(pn_selectable_t *sel) +{ + pn_transport_t *transport = pni_transport(sel); + ssize_t pending = pn_transport_pending(transport); + if (pending < 0) { + if (pn_transport_closed(transport)) { + pni_selectable_set_terminal(sel, true); + } + } + return pending; +} + +static pn_timestamp_t pni_connection_deadline(pn_selectable_t *sel) +{ + return 0; +} + +static void pni_connection_readable(pn_selectable_t *sel) +{ + pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); + pn_transport_t *transport = pni_transport(sel); + ssize_t capacity = pn_transport_capacity(transport); + if (capacity > 0) { + ssize_t n = pn_recv(pn_reactor_io(reactor), pn_selectable_fd(sel), + pn_transport_tail(transport), capacity); + if (n <= 0) { + if (n == 0 || !pn_wouldblock(pn_reactor_io(reactor))) { + if (n < 0) perror("recv"); + pn_transport_close_tail(transport); + /*if (!(pn_connection_state(connection) & PN_REMOTE_CLOSED)) { + pn_error_report("CONNECTION", "connection aborted (remote)"); + }*/ + } + } else { + /*int err =*/ pn_transport_process(transport, (size_t)n); + /*if (err) + pn_error_copy(messenger->error, pn_transport_error(transport));*/ + } + } + + ssize_t newcap = pn_transport_capacity(transport); + if (newcap != capacity) { + pn_reactor_update(reactor, sel); + } +} + +static void pni_connection_writable(pn_selectable_t *sel) +{ + pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); + pn_transport_t *transport = pni_transport(sel); + ssize_t pending = pn_transport_pending(transport); + if (pending > 0) { + ssize_t n = pn_send(pn_reactor_io(reactor), pn_selectable_fd(sel), + pn_transport_head(transport), pending); + if (n < 0) { + if (!pn_wouldblock(pn_reactor_io(reactor))) { + perror("send"); + pn_transport_close_head(transport); + } + } else { + pn_transport_pop(transport, n); + } + } + + ssize_t newpending = pn_transport_pending(transport); + if (newpending != pending) { + pn_reactor_update(reactor, sel); + } +} + +static void pni_connection_expired(pn_selectable_t *sel) {} + +static void pni_connection_finalize(pn_selectable_t *sel) { + pn_reactor_t *reactor = (pn_reactor_t *) pni_selectable_get_context(sel); + pn_transport_t *transport = pni_transport(sel); + pn_record_t *record = pn_transport_attachments(transport); + pn_record_set(record, PN_TRANCTX, NULL); + pn_socket_t fd = pn_selectable_fd(sel); + pn_close(pn_reactor_io(reactor), fd); +} + +pn_selectable_t *pn_reactor_selectable_transport(pn_reactor_t *reactor, pn_socket_t sock, pn_transport_t *transport) { + pn_selectable_t *sel = pn_reactor_selectable(reactor); + pn_selectable_set_capacity(sel, pni_connection_capacity); + pn_selectable_set_pending(sel, pni_connection_pending); + pn_selectable_set_deadline(sel, pni_connection_deadline); + pn_selectable_set_readable(sel, pni_connection_readable); + pn_selectable_set_writable(sel, pni_connection_writable); + pn_selectable_set_expired(sel, pni_connection_expired); + pn_selectable_set_finalize(sel, pni_connection_finalize); + pni_selectable_set_fd(sel, sock); + pni_selectable_set_context(sel, reactor); + pn_record_t *record = pn_selectable_attachments(sel); + pn_record_def(record, PN_TRANCTX, PN_OBJECT); + pn_record_set(record, PN_TRANCTX, transport); + pn_record_t *tr = pn_transport_attachments(transport); + pn_record_def(tr, PN_TRANCTX, PN_WEAKREF); + pn_record_set(tr, PN_TRANCTX, sel); + pn_reactor_update(reactor, sel); + return sel; +} + +pn_connection_t *pn_reactor_connection(pn_reactor_t *reactor, pn_handler_t *handler) { + assert(reactor); + pn_connection_t *connection = pn_connection(); + pn_record_t *record = pn_connection_attachments(connection); + pn_record_def(record, PN_HANDLER, PN_OBJECT); + pn_record_set(record, PN_HANDLER, handler); + pn_connection_collect(connection, pn_reactor_collector(reactor)); + pn_list_add(pn_reactor_children(reactor), connection); + pn_decref(connection); + return connection; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/reactor/handler.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/handler.c b/proton-c/src/reactor/handler.c new file mode 100644 index 0000000..cbbbb5d --- /dev/null +++ b/proton-c/src/reactor/handler.c @@ -0,0 +1,103 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/object.h> +#include <proton/reactor.h> +#include <proton/event.h> +#include <assert.h> + +struct pn_handler_t { + void (*dispatch) (pn_handler_t *, pn_event_t *); + void (*finalize) (pn_handler_t *); + pn_list_t *children; +}; + +void pn_handler_initialize(void *object) { + pn_handler_t *handler = (pn_handler_t *) object; + handler->dispatch = NULL; + handler->children = NULL; +} + +void pn_handler_finalize(void *object) { + pn_handler_t *handler = (pn_handler_t *) object; + if (handler->finalize) { + handler->finalize(handler); + } + pn_free(handler->children); +} + +#define pn_handler_hashcode NULL +#define pn_handler_compare NULL +#define pn_handler_inspect NULL + +pn_handler_t *pn_handler(void (*dispatch)(pn_handler_t *, pn_event_t *)) { + return pn_handler_new(dispatch, 0, NULL); +} + +pn_handler_t *pn_handler_new(void (*dispatch)(pn_handler_t *, pn_event_t *), size_t size, void (*finalize)(pn_handler_t *)) { + static const pn_class_t clazz = PN_CLASS(pn_handler); + pn_handler_t *handler = (pn_handler_t *) pn_class_new(&clazz, sizeof(pn_handler_t) + size); + handler->dispatch = dispatch; + handler->finalize = finalize; + return handler; +} + +void pn_handler_free(pn_handler_t *handler) { + if (handler) { + if (handler->children) { + size_t n = pn_list_size(handler->children); + for (size_t i = 0; i < n; i++) { + void *child = pn_list_get(handler->children, i); + pn_decref(child); + } + } + + pn_decref(handler); + } +} + +void *pn_handler_mem(pn_handler_t *handler) { + return (void *) (handler + 1); +} + +pn_handler_t *pn_handler_cast(void *mem) { + return ((pn_handler_t *) mem) - 1; +} + +void pn_handler_add(pn_handler_t *handler, pn_handler_t *child) { + if (!handler->children) { + handler->children = pn_list(PN_OBJECT, 0); + } + pn_list_add(handler->children, child); +} + +void pn_handler_dispatch(pn_handler_t *handler, pn_event_t *event) { + if (handler->dispatch) { + handler->dispatch(handler, event); + } + if (handler->children) { + size_t n = pn_list_size(handler->children); + for (size_t i = 0; i < n; i++) { + pn_handler_t *child = (pn_handler_t *) pn_list_get(handler->children, i); + pn_handler_dispatch(child, event); + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/reactor/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/reactor.c b/proton-c/src/reactor/reactor.c new file mode 100644 index 0000000..dbf63e9 --- /dev/null +++ b/proton-c/src/reactor/reactor.c @@ -0,0 +1,216 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/object.h> +#include <proton/io.h> +#include <proton/selector.h> +#include <proton/event.h> +#include <proton/reactor.h> +#include <proton/transport.h> +#include <proton/connection.h> +#include <proton/session.h> +#include <proton/link.h> +#include <stdio.h> +#include <stdlib.h> +#include <assert.h> + +#include "selectable.h" + +struct pn_reactor_t { + pn_io_t *io; + pn_selector_t *selector; + pn_collector_t *collector; + pn_handler_t *handler; + pn_list_t *children; +}; + +void *pni_handler = NULL; + +static void pn_dummy_dispatch(pn_handler_t *handler, pn_event_t *event) { + /*pn_string_t *str = pn_string(NULL); + pn_inspect(event, str); + printf("%s\n", pn_string_get(str)); + pn_free(str);*/ +} + +static void pn_reactor_initialize(void *object) { + pn_reactor_t *reactor = (pn_reactor_t *) object; + reactor->io = pn_io(); + reactor->selector = pn_io_selector(reactor->io); + reactor->collector = pn_collector(); + reactor->handler = pn_handler(pn_dummy_dispatch); + reactor->children = pn_list(PN_OBJECT, 0); +} + +static void pn_reactor_finalize(void *object) { + pn_reactor_t *reactor = (pn_reactor_t *) object; + pn_selector_free(reactor->selector); + pn_io_free(reactor->io); + pn_collector_free(reactor->collector); + pn_free(reactor->handler); + pn_free(reactor->children); +} + +#define pn_reactor_hashcode NULL +#define pn_reactor_compare NULL +#define pn_reactor_inspect NULL + +pn_reactor_t *pn_reactor() { + static const pn_class_t clazz = PN_CLASS(pn_reactor); + return (pn_reactor_t *) pn_class_new(&clazz, sizeof(pn_reactor_t)); +} + +void pn_reactor_free(pn_reactor_t *reactor) { + if (reactor) { + pn_handler_free(reactor->handler); + reactor->handler = NULL; + pn_decref(reactor); + } +} + +pn_handler_t *pn_reactor_handler(pn_reactor_t *reactor) { + assert(reactor); + return reactor->handler; +} + +pn_selector_t *pn_reactor_selector(pn_reactor_t *reactor) { + assert(reactor); + return reactor->selector; +} + +pn_io_t *pn_reactor_io(pn_reactor_t *reactor) { + assert(reactor); + return reactor->io; +} + +pn_collector_t *pn_reactor_collector(pn_reactor_t *reactor) { + assert(reactor); + return reactor->collector; +} + +pn_list_t *pn_reactor_children(pn_reactor_t *reactor) { + assert(reactor); + return reactor->children; +} + +pn_selectable_t *pn_reactor_selectable(pn_reactor_t *reactor) { + assert(reactor); + pn_selectable_t *sel = pn_selectable(); + pn_selector_add(reactor->selector, sel); + pn_list_add(reactor->children, sel); + pn_decref(sel); + return sel; +} + +void pn_reactor_update(pn_reactor_t *reactor, pn_selectable_t *selectable) { + assert(reactor); + pn_selector_update(reactor->selector, selectable); +} + +void pni_handle_transport(pn_reactor_t *reactor, pn_event_t *event); +void pni_handle_open(pn_reactor_t *reactor, pn_event_t *event); +void pni_handle_final(pn_reactor_t *reactor, pn_event_t *event); + +static void pni_reactor_dispatch(pn_reactor_t *reactor, pn_event_t *event) { + assert(reactor); + switch (pn_event_type(event)) { + case PN_TRANSPORT: + pni_handle_transport(reactor, event); + break; + case PN_CONNECTION_LOCAL_OPEN: + pni_handle_open(reactor, event); + break; + case PN_CONNECTION_FINAL: + pni_handle_final(reactor, event); + break; + default: + break; + } +} + +pn_record_t *pni_attachments(const pn_class_t *clazz, void *instance) { + switch (pn_class_id(clazz)) { + case CID_pn_connection: + return pn_connection_attachments((pn_connection_t *) instance); + case CID_pn_session: + return pn_session_attachments((pn_session_t *) instance); + case CID_pn_link: + return pn_link_attachments((pn_link_t *) instance); + default: + return NULL; + } +} + +pn_handler_t *pn_event_handler(pn_event_t *event) { + pn_record_t *record = pni_attachments(pn_event_class(event), pn_event_context(event)); + if (record) { + return (pn_handler_t *) pn_record_get(record, PN_HANDLER); + } else { + return NULL; + } +} + +void pn_reactor_process(pn_reactor_t *reactor) { + assert(reactor); + pn_event_t *event; + while ((event = pn_collector_peek(reactor->collector))) { + pn_handler_t *handler = pn_event_handler(event); + if (!handler) { + handler = reactor->handler; + } + pn_handler_dispatch(handler, event); + pni_reactor_dispatch(reactor, event); + pn_collector_pop(reactor->collector); + } +} + +void pn_reactor_run(pn_reactor_t *reactor) { + assert(reactor); + pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_INIT); + while (true) { + pn_reactor_process(reactor); + + if (!pn_selector_size(reactor->selector)) { + break; + } + + pn_selector_select(reactor->selector, 1000); + pn_selectable_t *sel; + int events; + while ((sel = pn_selector_next(reactor->selector, &events))) { + if (events & PN_READABLE) { + pn_selectable_readable(sel); + } + if (events & PN_WRITABLE) { + pn_selectable_writable(sel); + } + if (events & PN_EXPIRED) { + pn_selectable_expired(sel); + } + if (pn_selectable_is_terminal(sel)) { + pn_selector_remove(reactor->selector, sel); + pn_list_remove(reactor->children, sel); + } + } + } + pn_collector_put(reactor->collector, PN_OBJECT, reactor, PN_REACTOR_FINAL); + pn_reactor_process(reactor); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/tests/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/CMakeLists.txt b/proton-c/src/tests/CMakeLists.txt index fb7df1b..312c94f 100644 --- a/proton-c/src/tests/CMakeLists.txt +++ b/proton-c/src/tests/CMakeLists.txt @@ -45,3 +45,4 @@ pn_add_c_test (c-message-tests message.c) pn_add_c_test (c-engine-tests engine.c) pn_add_c_test (c-parse-url-tests parse-url.c) pn_add_c_test (c-refcount-tests refcount.c) +pn_add_c_test (c-reactor-tests reactor.c) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ebe4e3d3/proton-c/src/tests/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c new file mode 100644 index 0000000..bb93d42 --- /dev/null +++ b/proton-c/src/tests/reactor.c @@ -0,0 +1,320 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/reactor.h> +#include <proton/handlers.h> +#include <proton/connection.h> +#include <proton/event.h> +#include <proton/link.h> +#include <proton/delivery.h> +#include <stdlib.h> + +#define assert(E) ((E) ? 0 : (abort(), 0)) + + +static void test_reactor(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_free(reactor); +} + +static void test_reactor_run(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + // run should exit if there is nothing left to do + pn_reactor_run(reactor); + pn_free(reactor); +} + +typedef struct { + pn_list_t *events; +} pni_test_handler_t; + +pni_test_handler_t *thmem(pn_handler_t *handler) { + return (pni_test_handler_t *) pn_handler_mem(handler); +} + +void test_dispatch(pn_handler_t *handler, pn_event_t *event) { + pn_list_add(thmem(handler)->events, (void *) pn_event_type(event)); +} + +pn_handler_t *test_handler(pn_list_t *events) { + pn_handler_t *handler = pn_handler_new(test_dispatch, sizeof(pni_test_handler_t), NULL); + thmem(handler)->events = events; + return handler; +} + +#define END PN_EVENT_NONE + +void expect(pn_list_t *events, ...) { + va_list ap; + + va_start(ap, events); + size_t idx = 0; + while (true) { + pn_event_type_t expected = (pn_event_type_t) va_arg(ap, int); + if (expected == END) { + assert(idx == pn_list_size(events)); + break; + } + assert(idx < pn_list_size(events)); + pn_event_type_t actual = (pn_event_type_t) pn_list_get(events, idx++); + assert(expected == actual); + } + va_end(ap); +} + +static void test_reactor_handler(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_handler_t *handler = pn_reactor_handler(reactor); + assert(handler); + pn_list_t *events = pn_list(PN_VOID, 0); + pn_handler_t *th = test_handler(events); + pn_handler_add(handler, th); + pn_decref(th); + pn_free(reactor); + expect(events, END); + pn_free(events); +} + +static void test_reactor_handler_free(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_handler_t *handler = pn_reactor_handler(reactor); + assert(handler); + pn_list_t *events = pn_list(PN_VOID, 0); + pn_handler_add(handler, test_handler(events)); + pn_reactor_free(reactor); + expect(events, END); + pn_free(events); +} + +static void test_reactor_handler_run(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_handler_t *handler = pn_reactor_handler(reactor); + assert(handler); + pn_list_t *events = pn_list(PN_VOID, 0); + pn_handler_t *th = test_handler(events); + pn_handler_add(handler, th); + pn_reactor_run(reactor); + expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END); + pn_free(reactor); + pn_free(th); + pn_free(events); +} + +static void test_reactor_handler_run_free(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_handler_t *handler = pn_reactor_handler(reactor); + assert(handler); + pn_list_t *events = pn_list(PN_VOID, 0); + pn_handler_add(handler, test_handler(events)); + pn_reactor_run(reactor); + expect(events, PN_REACTOR_INIT, PN_REACTOR_FINAL, END); + pn_reactor_free(reactor); + pn_free(events); +} + +static void test_reactor_connection(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_list_t *cevents = pn_list(PN_VOID, 0); + pn_handler_t *tch = test_handler(cevents); + pn_connection_t *connection = pn_reactor_connection(reactor, tch); + assert(connection); + pn_handler_t *root = pn_reactor_handler(reactor); + pn_list_t *revents = pn_list(PN_VOID, 0); + pn_handler_add(root, test_handler(revents)); + pn_reactor_run(reactor); + expect(revents, PN_REACTOR_INIT, PN_REACTOR_FINAL, END); + expect(cevents, PN_CONNECTION_INIT, END); + pn_reactor_free(reactor); + pn_handler_free(tch); + pn_free(cevents); + pn_free(revents); +} + +static void test_reactor_acceptor(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL); + assert(acceptor); + pn_reactor_free(reactor); +} + +pn_acceptor_t **tram(pn_handler_t *h) { + return (pn_acceptor_t **) pn_handler_mem(h); +} + +static void tra_dispatch(pn_handler_t *handler, pn_event_t *event) { + switch (pn_event_type(event)) { + case PN_REACTOR_INIT: + { + pn_acceptor_t *acceptor = *tram(handler); + pn_reactor_t *reactor = (pn_reactor_t *) pn_event_context(event); + pn_acceptor_close(reactor, acceptor); + } + break; + default: + break; + } +} + +static pn_handler_t *tra_handler(pn_acceptor_t *acceptor) { + pn_handler_t *handler = pn_handler_new(tra_dispatch, sizeof(pn_acceptor_t *), NULL); + *tram(handler) = acceptor; + return handler; +} + +static void test_reactor_acceptor_run(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_handler_t *root = pn_reactor_handler(reactor); + assert(root); + pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL); + assert(acceptor); + pn_handler_add(root, tra_handler(acceptor)); + pn_reactor_run(reactor); + pn_reactor_free(reactor); +} + +typedef struct { + pn_reactor_t *reactor; + pn_acceptor_t *acceptor; + pn_list_t *events; +} server_t; + +static server_t *smem(pn_handler_t *handler) { + return (server_t *) pn_handler_mem(handler); +} + +static void server_dispatch(pn_handler_t *handler, pn_event_t *event) { + server_t *srv = smem(handler); + pn_list_add(srv->events, (void *) pn_event_type(event)); + switch (pn_event_type(event)) { + case PN_CONNECTION_REMOTE_OPEN: + pn_acceptor_close(srv->reactor, srv->acceptor); + pn_connection_close(pn_event_connection(event)); + pn_connection_release(pn_event_connection(event)); + break; + default: + break; + } +} + +typedef struct { + pn_list_t *events; +} client_t; + +static client_t *cmem(pn_handler_t *handler) { + return (client_t *) pn_handler_mem(handler); +} + +static void client_dispatch(pn_handler_t *handler, pn_event_t *event) { + client_t *cli = cmem(handler); + pn_list_add(cli->events, (void *) pn_event_type(event)); + pn_connection_t *conn = pn_event_connection(event); + switch (pn_event_type(event)) { + case PN_CONNECTION_INIT: + pn_connection_set_hostname(conn, "localhost:5672"); + pn_connection_open(conn); + break; + case PN_CONNECTION_REMOTE_CLOSE: + pn_connection_close(conn); + pn_connection_release(conn); + break; + default: + break; + } +} + +static void test_reactor_connect(void) { + pn_reactor_t *reactor = pn_reactor(); + pn_handler_t *sh = pn_handler_new(server_dispatch, sizeof(server_t), NULL); + server_t *srv = smem(sh); + pn_acceptor_t *acceptor = pn_reactor_acceptor(reactor, "0.0.0.0", "5672", sh); + srv->reactor = reactor; + srv->acceptor = acceptor; + srv->events = pn_list(PN_VOID, 0); + pn_decref(sh); + pn_handler_t *ch = pn_handler_new(client_dispatch, sizeof(client_t), NULL); + client_t *cli = cmem(ch); + cli->events = pn_list(PN_VOID, 0); + pn_reactor_connection(reactor, ch); + pn_reactor_run(reactor); + expect(srv->events, PN_CONNECTION_INIT, PN_CONNECTION_BOUND, + PN_CONNECTION_REMOTE_OPEN, PN_CONNECTION_LOCAL_CLOSE, + PN_CONNECTION_REMOTE_CLOSE, PN_CONNECTION_UNBOUND, + PN_CONNECTION_FINAL, END); + pn_free(srv->events); + expect(cli->events, PN_CONNECTION_INIT, PN_CONNECTION_LOCAL_OPEN, + PN_CONNECTION_BOUND, PN_CONNECTION_REMOTE_OPEN, + PN_CONNECTION_REMOTE_CLOSE, PN_CONNECTION_LOCAL_CLOSE, + PN_CONNECTION_UNBOUND, PN_CONNECTION_FINAL, END); + pn_free(cli->events); + pn_decref(ch); + pn_reactor_free(reactor); +} + +/* +void dispatch(pn_handler_t *handler, pn_event_t *event) { + pn_delivery_t *dlv = pn_event_delivery(event); + switch (pn_event_type(event)) { + case PN_DELIVERY: + if (!pn_delivery_partial(dlv)) { + pn_delivery_settle(dlv); + } + break; + default: + break; + } +} + +static void test_reactor_flow(void) { + pn_reactor_t *reactor = pn_reactor(); + assert(reactor); + pn_handler_t *root = pn_reactor_handler(reactor); + assert(root); + pn_handler_add(root, pn_handler(dispatch)); + pn_handler_add(root, pn_handler_cast(pn_handshaker())); + pn_handler_add(root, pn_handler_cast(pn_flowcontroller(4*1024))); + pn_reactor_acceptor(reactor, "0.0.0.0", "5672", NULL); + pn_reactor_run(reactor); + pn_reactor_free(reactor); + }*/ + +int main(int argc, char **argv) +{ + test_reactor(); + test_reactor_run(); + test_reactor_handler(); + test_reactor_handler_free(); + test_reactor_handler_run(); + test_reactor_handler_run_free(); + test_reactor_connection(); + test_reactor_acceptor(); + test_reactor_acceptor_run(); + test_reactor_connect(); + return 0; +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
