http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/include/proton/util.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/util.h b/proton-c/include/proton/util.h deleted file mode 100644 index 70043eb..0000000 --- a/proton-c/include/proton/util.h +++ /dev/null @@ -1,40 +0,0 @@ -#ifndef PROTON_UTIL_H -#define PROTON_UTIL_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/import_export.h> -#include <stdarg.h> - -#ifdef __cplusplus -extern "C" { -#endif - -PN_EXTERN void pni_parse_url(char *url, char **scheme, char **user, char **pass, char **host, char **port, char **path); -PN_EXTERN void pn_fatal(const char *fmt, ...); -PN_EXTERN void pn_vfatal(const char *fmt, va_list ap); - -#ifdef __cplusplus -} -#endif - -#endif /* util.h */
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/codec/codec.c ---------------------------------------------------------------------- diff --git a/proton-c/src/codec/codec.c b/proton-c/src/codec/codec.c index 87d7df7..afd0c57 100644 --- a/proton-c/src/codec/codec.c +++ b/proton-c/src/codec/codec.c @@ -22,7 +22,6 @@ #include <proton/object.h> #include <proton/codec.h> #include <proton/error.h> -#include <proton/util.h> #include <assert.h> #include <stdio.h> #include <string.h> @@ -32,9 +31,9 @@ #include "encodings.h" #define DEFINE_FIELDS #include "protocol.h" -#include "../platform.h" -#include "../platform_fmt.h" -#include "../util.h" +#include "platform.h" +#include "platform_fmt.h" +#include "util.h" #include "decoder.h" #include "encoder.h" #include "data.h" @@ -367,7 +366,7 @@ static int pn_data_inspect(void *obj, pn_string_t *dst) pn_data_t *pn_data(size_t capacity) { static const pn_class_t clazz = PN_CLASS(pn_data); - pn_data_t *data = (pn_data_t *) pn_new(sizeof(pn_data_t), &clazz); + pn_data_t *data = (pn_data_t *) pn_class_new(&clazz, sizeof(pn_data_t)); data->capacity = capacity; data->size = 0; data->nodes = capacity ? (pni_node_t *) malloc(capacity * sizeof(pni_node_t)) : NULL; @@ -1112,15 +1111,6 @@ int pn_data_resize(pn_data_t *data, size_t size) } -pni_node_t *pn_data_node(pn_data_t *data, pni_nid_t nd) -{ - if (nd) { - return &data->nodes[nd - 1]; - } else { - return NULL; - } -} - size_t pn_data_id(pn_data_t *data, pni_node_t *node) { return node - data->nodes + 1; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/codec/data.h ---------------------------------------------------------------------- diff --git a/proton-c/src/codec/data.h b/proton-c/src/codec/data.h index be1669a..a528d26 100644 --- a/proton-c/src/codec/data.h +++ b/proton-c/src/codec/data.h @@ -61,7 +61,11 @@ struct pn_data_t { pni_nid_t base_current; }; -pni_node_t *pn_data_node(pn_data_t *data, pni_nid_t nd); +inline pni_node_t * pn_data_node(pn_data_t *data, pni_nid_t nd) +{ + return nd ? (data->nodes + nd - 1) : NULL; +} + int pni_data_traverse(pn_data_t *data, int (*enter)(void *ctx, pn_data_t *data, pni_node_t *node), int (*exit)(void *ctx, pn_data_t *data, pni_node_t *node), http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/codec/decoder.c ---------------------------------------------------------------------- diff --git a/proton-c/src/codec/decoder.c b/proton-c/src/codec/decoder.c index 7a01388..2bd4ecc 100644 --- a/proton-c/src/codec/decoder.c +++ b/proton-c/src/codec/decoder.c @@ -55,7 +55,7 @@ static void pn_decoder_finalize(void *obj) { pn_decoder_t *pn_decoder() { static const pn_class_t clazz = PN_CLASS(pn_decoder); - return (pn_decoder_t *) pn_new(sizeof(pn_decoder_t), &clazz); + return (pn_decoder_t *) pn_class_new(&clazz, sizeof(pn_decoder_t)); } static inline uint8_t pn_decoder_readf8(pn_decoder_t *decoder) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/codec/encoder.c ---------------------------------------------------------------------- diff --git a/proton-c/src/codec/encoder.c b/proton-c/src/codec/encoder.c index f0f3cef..4a32183 100644 --- a/proton-c/src/codec/encoder.c +++ b/proton-c/src/codec/encoder.c @@ -57,7 +57,7 @@ static void pn_encoder_finalize(void *obj) { pn_encoder_t *pn_encoder() { static const pn_class_t clazz = PN_CLASS(pn_encoder); - return (pn_encoder_t *) pn_new(sizeof(pn_encoder_t), &clazz); + return (pn_encoder_t *) pn_class_new(&clazz, sizeof(pn_encoder_t)); } static uint8_t pn_type2code(pn_encoder_t *encoder, pn_type_t type) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/dispatcher/dispatcher.c ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c index 296c3ab..6368aa5 100644 --- a/proton-c/src/dispatcher/dispatcher.c +++ b/proton-c/src/dispatcher/dispatcher.c @@ -27,8 +27,8 @@ #include <proton/buffer.h> #include "dispatcher.h" #include "protocol.h" -#include "../util.h" -#include "../platform_fmt.h" +#include "util.h" +#include "platform_fmt.h" #include "dispatch_actions.h" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/dispatcher/dispatcher.h ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatcher/dispatcher.h b/proton-c/src/dispatcher/dispatcher.h index a87e383..9ec2dda 100644 --- a/proton-c/src/dispatcher/dispatcher.h +++ b/proton-c/src/dispatcher/dispatcher.h @@ -26,8 +26,9 @@ #ifndef __cplusplus #include <stdbool.h> #endif -#include <proton/buffer.h> -#include <proton/codec.h> +#include "proton/buffer.h" +#include "proton/codec.h" +#include "proton/transport.h" typedef struct pn_dispatcher_t pn_dispatcher_t; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index 03cb630..37e8311 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -26,8 +26,8 @@ #include <proton/buffer.h> #include <proton/engine.h> #include <proton/types.h> -#include "../dispatcher/dispatcher.h" -#include "../util.h" +#include "dispatcher/dispatcher.h" +#include "util.h" typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpoint_type_t; @@ -127,6 +127,7 @@ struct pn_transport_t { uint32_t local_max_frame; uint32_t remote_max_frame; pn_condition_t remote_condition; + pn_condition_t condition; #define PN_IO_SSL 0 #define PN_IO_SASL 1 @@ -174,6 +175,8 @@ struct pn_transport_t { bool tail_closed; // input stream closed by driver bool head_closed; bool done_processing; // if true, don't call pn_process again + bool posted_head_closed; + bool posted_tail_closed; }; struct pn_connection_t { @@ -250,6 +253,7 @@ struct pn_link_t { uint8_t remote_rcv_settle_mode; bool drain_flag_mode; // receiver only bool drain; + bool detached; }; struct pn_disposition_t { @@ -311,5 +315,10 @@ void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery); void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint); void pn_connection_unbound(pn_connection_t *conn); int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...); +void pn_session_unbound(pn_session_t* ssn); +void pn_link_unbound(pn_link_t* link); + +void pni_close_tail(pn_transport_t *transport); + #endif /* engine-internal.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/engine/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c index 02e5009..46bf462 100644 --- a/proton-c/src/engine/engine.c +++ b/proton-c/src/engine/engine.c @@ -28,12 +28,9 @@ #include <stdarg.h> #include <stdio.h> -#include "../sasl/sasl-internal.h" -#include "../ssl/ssl-internal.h" -#include "../platform.h" -#include "../platform_fmt.h" -#include "../transport/transport.h" -#include "../engine/event.h" +#include "platform.h" +#include "platform_fmt.h" +#include "transport/transport.h" // endpoints @@ -72,8 +69,8 @@ static void pn_endpoint_open(pn_endpoint_t *endpoint) // TODO: do we care about the current state? PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE); pn_connection_t *conn = pn_ep_get_connection(endpoint); - pn_collector_put(conn->collector, endpoint_event(endpoint->type, true), - endpoint); + pn_collector_put(conn->collector, PN_OBJECT, endpoint, + endpoint_event(endpoint->type, true)); pn_modified(conn, endpoint, true); } @@ -82,8 +79,8 @@ static void pn_endpoint_close(pn_endpoint_t *endpoint) // TODO: do we care about the current state? PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED); pn_connection_t *conn = pn_ep_get_connection(endpoint); - pn_collector_put(conn->collector, endpoint_event(endpoint->type, false), - endpoint); + pn_collector_put(conn->collector, PN_OBJECT, endpoint, + endpoint_event(endpoint->type, false)); pn_modified(conn, endpoint, true); } @@ -190,7 +187,7 @@ void pn_add_session(pn_connection_t *conn, pn_session_t *ssn) { pn_list_add(conn->sessions, ssn); ssn->connection = conn; - pn_incref2(conn, ssn); // keep around until finalized + pn_incref(conn); // keep around until finalized } void pn_remove_session(pn_connection_t *conn, pn_session_t *ssn) @@ -228,7 +225,7 @@ void pn_session_free(pn_session_t *session) pn_endpoint_t *endpoint = (pn_endpoint_t *) session; LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint); session->endpoint.freed = true; - pn_decref2(session, session->connection); + pn_decref(session); } void *pn_session_get_context(pn_session_t *session) @@ -266,6 +263,15 @@ void pn_link_close(pn_link_t *link) pn_endpoint_close(&link->endpoint); } +void pn_link_detach(pn_link_t *link) +{ + assert(link); + link->detached = true; + pn_collector_put(link->session->connection->collector, PN_OBJECT, link, PN_LINK_DETACH); + pn_modified(link->session->connection, &link->endpoint, true); + +} + void pn_terminus_free(pn_terminus_t *terminus) { pn_free(terminus->address); @@ -290,10 +296,10 @@ void pn_link_free(pn_link_t *link) while (link->settled_head) { delivery = link->settled_head; LL_POP(link, settled, pn_delivery_t); - pn_decref2(delivery, link); + pn_decref(delivery); } link->endpoint.freed = true; - pn_decref2(link, link->session); + pn_decref(link); } void *pn_link_get_context(pn_link_t *link) @@ -332,14 +338,13 @@ void pn_endpoint_tini(pn_endpoint_t *endpoint) pn_condition_tini(&endpoint->condition); } -#include "event.h" - static bool pni_post_final(pn_endpoint_t *endpoint, pn_event_type_t type) { pn_connection_t *conn = pn_ep_get_connection(endpoint); if (!endpoint->posted_final) { endpoint->posted_final = true; - pn_event_t *event = pn_collector_put(conn->collector, type, endpoint); + pn_event_t *event = pn_collector_put(conn->collector, PN_OBJECT, endpoint, + type); if (event) { return true; } } @@ -355,7 +360,7 @@ static void pn_connection_finalize(void *object) return; } - pn_decref2(conn->collector, conn); + pn_decref(conn->collector); pn_free(conn->sessions); pn_free(conn->container); pn_free(conn->hostname); @@ -373,7 +378,7 @@ static void pn_connection_finalize(void *object) pn_connection_t *pn_connection() { static const pn_class_t clazz = PN_CLASS(pn_connection); - pn_connection_t *conn = (pn_connection_t *) pn_new(sizeof(pn_connection_t), &clazz); + pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t)); if (!conn) return NULL; conn->context = NULL; @@ -382,7 +387,7 @@ pn_connection_t *pn_connection() pn_endpoint_init(&conn->endpoint, CONNECTION, conn); conn->transport_head = NULL; conn->transport_tail = NULL; - conn->sessions = pn_list(0, 0); + conn->sessions = pn_list(PN_WEAKREF, 0); conn->transport = NULL; conn->work_head = NULL; conn->work_tail = NULL; @@ -406,12 +411,12 @@ static const pn_event_type_t endpoint_init_event_map[] = { void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector) { - pn_decref2(connection->collector, connection); + pn_decref(connection->collector); connection->collector = collector; - pn_incref2(connection->collector, connection); + pn_incref(connection->collector); pn_endpoint_t *endpoint = connection->endpoint_head; while (endpoint) { - pn_collector_put(connection->collector, endpoint_init_event_map[endpoint->type], endpoint); + pn_collector_put(connection->collector, PN_OBJECT, endpoint, endpoint_init_event_map[endpoint->type]); endpoint = endpoint->endpoint_next; } } @@ -561,7 +566,7 @@ void pn_add_tpwork(pn_delivery_t *delivery) { LL_ADD(connection, tpwork, delivery); delivery->tpwork = true; - pn_incref2(delivery, connection); + pn_incref(delivery); } pn_modified(connection, &connection->endpoint, true); } @@ -573,7 +578,7 @@ void pn_clear_tpwork(pn_delivery_t *delivery) { LL_REMOVE(connection, tpwork, delivery); delivery->tpwork = false; - pn_decref2(delivery, connection); // may free delivery! + pn_decref(delivery); // may free delivery! } } @@ -595,12 +600,12 @@ void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit if (!endpoint->modified) { LL_ADD(connection, transport, endpoint); endpoint->modified = true; - pn_incref2(endpoint, connection); + pn_incref(endpoint); } if (emit && connection->transport) { - pn_collector_put(connection->collector, PN_TRANSPORT, - connection->transport); + pn_collector_put(connection->collector, PN_OBJECT, connection->transport, + PN_TRANSPORT); } } @@ -611,7 +616,7 @@ void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint) endpoint->transport_next = NULL; endpoint->transport_prev = NULL; endpoint->modified = false; - pn_decref2(endpoint, connection); // may free endpoint! + pn_decref(endpoint); // may free endpoint! } } @@ -709,7 +714,7 @@ static void pn_session_finalize(void *object) pn_delivery_map_free(&session->state.outgoing); pn_free(session->state.local_handles); pn_free(session->state.remote_handles); - pn_decref2(session->connection, session); + pn_decref(session->connection); } #define pn_session_initialize NULL @@ -721,12 +726,12 @@ pn_session_t *pn_session(pn_connection_t *conn) { assert(conn); static const pn_class_t clazz = PN_CLASS(pn_session); - pn_session_t *ssn = (pn_session_t *) pn_new2(sizeof(pn_session_t), &clazz, conn); + pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t)); if (!ssn) return NULL; pn_endpoint_init(&ssn->endpoint, SESSION, conn); pn_add_session(conn, ssn); - ssn->links = pn_list(0, 0); + ssn->links = pn_list(PN_WEAKREF, 0); ssn->context = 0; ssn->incoming_capacity = 1024*1024; ssn->incoming_bytes = 0; @@ -740,14 +745,25 @@ pn_session_t *pn_session(pn_connection_t *conn) ssn->state.remote_channel = (uint16_t)-1; pn_delivery_map_init(&ssn->state.incoming, 0); pn_delivery_map_init(&ssn->state.outgoing, 0); - ssn->state.local_handles = pn_hash(0, 0.75, PN_REFCOUNT); - ssn->state.remote_handles = pn_hash(0, 0.75, PN_REFCOUNT); + ssn->state.local_handles = pn_hash(PN_OBJECT, 0, 0.75); + ssn->state.remote_handles = pn_hash(PN_OBJECT, 0, 0.75); // end transport state - pn_collector_put(conn->collector, PN_SESSION_INIT, ssn); + pn_collector_put(conn->collector, PN_OBJECT, ssn, PN_SESSION_INIT); return ssn; } +void pn_session_unbound(pn_session_t* ssn) +{ + assert(ssn); + ssn->state.local_channel = (uint16_t)-1; + ssn->state.remote_channel = (uint16_t)-1; + ssn->incoming_bytes = 0; + ssn->outgoing_bytes = 0; + ssn->incoming_deliveries = 0; + ssn->outgoing_deliveries = 0; +} + size_t pn_session_get_incoming_capacity(pn_session_t *ssn) { assert(ssn); @@ -817,7 +833,7 @@ static void pn_link_finalize(void *object) pn_terminus_free(&link->remote_target); pn_free(link->name); pn_endpoint_tini(endpoint); - pn_decref2(link->session, link); + pn_decref(link->session); } #define pn_link_initialize NULL @@ -828,11 +844,11 @@ static void pn_link_finalize(void *object) pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) { static const pn_class_t clazz = PN_CLASS(pn_link); - pn_link_t *link = (pn_link_t *) pn_new2(sizeof(pn_link_t), &clazz, session); + pn_link_t *link = (pn_link_t *) pn_class_new(&clazz, sizeof(pn_link_t)); pn_endpoint_init(&link->endpoint, type, session->connection); pn_add_link(session, link); - pn_incref2(session, link); // keep session until link finalized + pn_incref(session); // keep session until link finalized link->name = pn_string(name); pn_terminus_init(&link->source, PN_SOURCE); pn_terminus_init(&link->target, PN_TARGET); @@ -852,6 +868,7 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) link->rcv_settle_mode = PN_RCV_FIRST; link->remote_snd_settle_mode = PN_SND_MIXED; link->remote_rcv_settle_mode = PN_RCV_FIRST; + link->detached = false; // begin transport state link->state.local_handle = -1; @@ -860,10 +877,19 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) link->state.link_credit = 0; // end transport state - pn_collector_put(session->connection->collector, PN_LINK_INIT, link); + pn_collector_put(session->connection->collector, PN_OBJECT, link, PN_LINK_INIT); return link; } +void pn_link_unbound(pn_link_t* link) +{ + assert(link); + link->state.local_handle = -1; + link->state.remote_handle = -1; + link->state.delivery_count = 0; + link->state.link_credit = 0; +} + pn_terminus_t *pn_link_source(pn_link_t *link) { return link ? &link->source : NULL; @@ -1072,7 +1098,7 @@ static void pn_delivery_finalize(void *object) pn_buffer_free(delivery->bytes); pn_disposition_finalize(&delivery->local); pn_disposition_finalize(&delivery->remote); - pn_decref2(delivery->link, delivery); + pn_decref(delivery->link); } static void pn_disposition_init(pn_disposition_t *ds) @@ -1107,10 +1133,10 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) LL_POP(link, settled, pn_delivery_t); if (!delivery) { static const pn_class_t clazz = PN_CLASS(pn_delivery); - delivery = (pn_delivery_t *) pn_new2(sizeof(pn_delivery_t), &clazz, link); + delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t)); if (!delivery) return NULL; delivery->link = link; - pn_incref2(delivery->link, delivery); // keep link until finalized + pn_incref(delivery->link); // keep link until finalized delivery->tag = pn_buffer(16); delivery->bytes = pn_buffer(64); pn_disposition_init(&delivery->local); @@ -1734,3 +1760,77 @@ int pn_condition_redirect_port(pn_condition_t *condition) pn_data_rewind(data); return port; } + +pn_connection_t *pn_event_connection(pn_event_t *event) +{ + pn_session_t *ssn; + pn_transport_t *transport; + + switch (pn_class_id(pn_event_class(event))) { + case CID_pn_connection: + return (pn_connection_t *) pn_event_context(event); + case CID_pn_transport: + transport = pn_event_transport(event); + if (transport) + return transport->connection; + return NULL; + default: + ssn = pn_event_session(event); + if (ssn) + return pn_session_connection(ssn); + } + return NULL; +} + +pn_session_t *pn_event_session(pn_event_t *event) +{ + pn_link_t *link; + switch (pn_class_id(pn_event_class(event))) { + case CID_pn_session: + return (pn_session_t *) pn_event_context(event); + default: + link = pn_event_link(event); + if (link) + return pn_link_session(link); + } + return NULL; +} + +pn_link_t *pn_event_link(pn_event_t *event) +{ + pn_delivery_t *dlv; + switch (pn_class_id(pn_event_class(event))) { + case CID_pn_link: + return (pn_link_t *) pn_event_context(event); + default: + dlv = pn_event_delivery(event); + if (dlv) + return pn_delivery_link(dlv); + } + return NULL; +} + +pn_delivery_t *pn_event_delivery(pn_event_t *event) +{ + switch (pn_class_id(pn_event_class(event))) { + case CID_pn_delivery: + return (pn_delivery_t *) pn_event_context(event); + default: + return NULL; + } +} + +pn_transport_t *pn_event_transport(pn_event_t *event) +{ + switch (pn_class_id(pn_event_class(event))) { + case CID_pn_transport: + return (pn_transport_t *) pn_event_context(event); + default: + { + pn_connection_t *conn = pn_event_connection(event); + if (conn) + return pn_connection_transport(conn); + return NULL; + } + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/engine/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/event.c b/proton-c/src/engine/event.c deleted file mode 100644 index 07e3cb5..0000000 --- a/proton-c/src/engine/event.c +++ /dev/null @@ -1,348 +0,0 @@ -#include <proton/engine.h> -#include <assert.h> -#include "engine-internal.h" - -struct pn_collector_t { - pn_event_t *head; - pn_event_t *tail; - pn_event_t *free_head; - bool freed; -}; - -struct pn_event_t { - void *context; // depends on type - pn_event_t *next; - pn_event_type_t type; -}; - -static void pn_collector_initialize(void *obj) -{ - pn_collector_t *collector = (pn_collector_t *) obj; - collector->head = NULL; - collector->tail = NULL; - collector->free_head = NULL; - collector->freed = false; -} - -static void pn_collector_drain(pn_collector_t *collector) -{ - while (pn_collector_peek(collector)) { - pn_collector_pop(collector); - } - - assert(!collector->head); - assert(!collector->tail); -} - -static void pn_collector_shrink(pn_collector_t *collector) -{ - pn_event_t *event = collector->free_head; - while (event) { - pn_event_t *next = event->next; - pn_free(event); - event = next; - } - - collector->free_head = NULL; -} - -static void pn_collector_finalize(void *obj) -{ - pn_collector_t *collector = (pn_collector_t *) obj; - pn_collector_drain(collector); - pn_collector_shrink(collector); -} - -static int pn_collector_inspect(void *obj, pn_string_t *dst) -{ - assert(obj); - pn_collector_t *collector = (pn_collector_t *) obj; - int err = pn_string_addf(dst, "EVENTS["); - if (err) return err; - pn_event_t *event = collector->head; - bool first = true; - while (event) { - if (first) { - first = false; - } else { - err = pn_string_addf(dst, ", "); - if (err) return err; - } - err = pn_inspect(event, dst); - if (err) return err; - event = event->next; - } - return pn_string_addf(dst, "]"); -} - -#define pn_collector_hashcode NULL -#define pn_collector_compare NULL - -pn_collector_t *pn_collector(void) -{ - static const pn_class_t clazz = PN_CLASS(pn_collector); - pn_collector_t *collector = (pn_collector_t *) pn_new(sizeof(pn_collector_t), &clazz); - return collector; -} - -void pn_collector_free(pn_collector_t *collector) -{ - collector->freed = true; - pn_collector_drain(collector); - pn_collector_shrink(collector); - pn_decref(collector); -} - -pn_event_t *pn_event(void); -static void pn_event_initialize(void *obj); - -pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type, void *context) -{ - if (!collector) { - return NULL; - } - - assert(context); - - if (collector->freed) { - return NULL; - } - - pn_event_t *tail = collector->tail; - if (tail && tail->type == type && tail->context == context) { - return NULL; - } - - pn_event_t *event; - - if (collector->free_head) { - event = collector->free_head; - collector->free_head = collector->free_head->next; - pn_event_initialize(event); - } else { - event = pn_event(); - } - - if (tail) { - tail->next = event; - collector->tail = event; - } else { - collector->tail = event; - collector->head = event; - } - - event->type = type; - event->context = context; - pn_incref2(event->context, collector); - - //printf("event %s on %p\n", pn_event_type_name(event->type), event->context); - - return event; -} - -pn_event_t *pn_collector_peek(pn_collector_t *collector) -{ - return collector->head; -} - -bool pn_collector_pop(pn_collector_t *collector) -{ - pn_event_t *event = collector->head; - if (event) { - collector->head = event->next; - } else { - return false; - } - - if (!collector->head) { - collector->tail = NULL; - } - - // decref before adding to the free list - if (event->context) { - pn_decref2(event->context, collector); - event->context = NULL; - } - - event->next = collector->free_head; - collector->free_head = event; - - return true; -} - -static void pn_event_initialize(void *obj) -{ - pn_event_t *event = (pn_event_t *) obj; - event->type = PN_EVENT_NONE; - event->context = NULL; - event->next = NULL; -} - -static void pn_event_finalize(void *obj) {} - -static int pn_event_inspect(void *obj, pn_string_t *dst) -{ - assert(obj); - pn_event_t *event = (pn_event_t *) obj; - int err = pn_string_addf(dst, "(0x%X", (unsigned int)event->type); - if (event->context) { - err = pn_string_addf(dst, ", "); - if (err) return err; - err = pn_inspect(event->context, dst); - if (err) return err; - } - - return pn_string_addf(dst, ")"); -} - -#define pn_event_hashcode NULL -#define pn_event_compare NULL - -pn_event_t *pn_event(void) -{ - static const pn_class_t clazz = PN_CLASS(pn_event); - pn_event_t *event = (pn_event_t *) pn_new(sizeof(pn_event_t), &clazz); - return event; -} - -pn_event_type_t pn_event_type(pn_event_t *event) -{ - return event->type; -} - -pn_event_category_t pn_event_category(pn_event_t *event) -{ - return (pn_event_category_t)(event->type & 0xFFFF0000); -} - -void *pn_event_context(pn_event_t *event) -{ - assert(event); - return event->context; -} - -pn_connection_t *pn_event_connection(pn_event_t *event) -{ - pn_session_t *ssn; - pn_transport_t *transport; - - switch (pn_event_category(event)) { - case PN_EVENT_CATEGORY_CONNECTION: - return (pn_connection_t *)event->context; - case PN_EVENT_CATEGORY_TRANSPORT: - transport = pn_event_transport(event); - if (transport) - return transport->connection; - return NULL; - default: - ssn = pn_event_session(event); - if (ssn) - return pn_session_connection(ssn); - } - return NULL; -} - -pn_session_t *pn_event_session(pn_event_t *event) -{ - pn_link_t *link; - switch (pn_event_category(event)) { - case PN_EVENT_CATEGORY_SESSION: - return (pn_session_t *)event->context; - default: - link = pn_event_link(event); - if (link) - return pn_link_session(link); - } - return NULL; -} - -pn_link_t *pn_event_link(pn_event_t *event) -{ - pn_delivery_t *dlv; - switch (pn_event_category(event)) { - case PN_EVENT_CATEGORY_LINK: - return (pn_link_t *)event->context; - default: - dlv = pn_event_delivery(event); - if (dlv) - return pn_delivery_link(dlv); - } - return NULL; -} - -pn_delivery_t *pn_event_delivery(pn_event_t *event) -{ - switch (pn_event_category(event)) { - case PN_EVENT_CATEGORY_DELIVERY: - return (pn_delivery_t *)event->context; - default: - return NULL; - } -} - -pn_transport_t *pn_event_transport(pn_event_t *event) -{ - switch (pn_event_category(event)) { - case PN_EVENT_CATEGORY_TRANSPORT: - return (pn_transport_t *)event->context; - default: - { - pn_connection_t *conn = pn_event_connection(event); - if (conn) - return pn_connection_transport(conn); - return NULL; - } - } -} - -const char *pn_event_type_name(pn_event_type_t type) -{ - switch (type) { - case PN_EVENT_NONE: - return "PN_EVENT_NONE"; - case PN_CONNECTION_INIT: - return "PN_CONNECTION_INIT"; - case PN_CONNECTION_REMOTE_OPEN: - return "PN_CONNECTION_REMOTE_OPEN"; - case PN_CONNECTION_OPEN: - return "PN_CONNECTION_OPEN"; - case PN_CONNECTION_REMOTE_CLOSE: - return "PN_CONNECTION_REMOTE_CLOSE"; - case PN_CONNECTION_CLOSE: - return "PN_CONNECTION_CLOSE"; - case PN_CONNECTION_FINAL: - return "PN_CONNECTION_FINAL"; - case PN_SESSION_INIT: - return "PN_SESSION_INIT"; - case PN_SESSION_REMOTE_OPEN: - return "PN_SESSION_REMOTE_OPEN"; - case PN_SESSION_OPEN: - return "PN_SESSION_OPEN"; - case PN_SESSION_REMOTE_CLOSE: - return "PN_SESSION_REMOTE_CLOSE"; - case PN_SESSION_CLOSE: - return "PN_SESSION_CLOSE"; - case PN_SESSION_FINAL: - return "PN_SESSION_FINAL"; - case PN_LINK_INIT: - return "PN_LINK_INIT"; - case PN_LINK_REMOTE_OPEN: - return "PN_LINK_REMOTE_OPEN"; - case PN_LINK_OPEN: - return "PN_LINK_OPEN"; - case PN_LINK_REMOTE_CLOSE: - return "PN_LINK_REMOTE_CLOSE"; - case PN_LINK_CLOSE: - return "PN_LINK_CLOSE"; - case PN_LINK_FLOW: - return "PN_LINK_FLOW"; - case PN_LINK_FINAL: - return "PN_LINK_FINAL"; - case PN_DELIVERY: - return "PN_DELIVERY"; - case PN_TRANSPORT: - return "PN_TRANSPORT"; - } - - return "<unrecognized>"; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/engine/event.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/event.h b/proton-c/src/engine/event.h deleted file mode 100644 index b05f2d0..0000000 --- a/proton-c/src/engine/event.h +++ /dev/null @@ -1,28 +0,0 @@ -#ifndef _PROTON_EVENT_H -#define _PROTON_EVENT_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type, - void *context); - -#endif /* event.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/events/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c new file mode 100644 index 0000000..95aeb03 --- /dev/null +++ b/proton-c/src/events/event.c @@ -0,0 +1,298 @@ +#include <proton/object.h> +#include <proton/event.h> +#include <assert.h> + +struct pn_collector_t { + pn_event_t *head; + pn_event_t *tail; + pn_event_t *free_head; + bool freed; +}; + +struct pn_event_t { + const pn_class_t *clazz; + void *context; // depends on type + pn_event_t *next; + pn_event_type_t type; +}; + +static void pn_collector_initialize(void *obj) +{ + pn_collector_t *collector = (pn_collector_t *) obj; + collector->head = NULL; + collector->tail = NULL; + collector->free_head = NULL; + collector->freed = false; +} + +static void pn_collector_drain(pn_collector_t *collector) +{ + while (pn_collector_peek(collector)) { + pn_collector_pop(collector); + } + + assert(!collector->head); + assert(!collector->tail); +} + +static void pn_collector_shrink(pn_collector_t *collector) +{ + pn_event_t *event = collector->free_head; + while (event) { + pn_event_t *next = event->next; + pn_free(event); + event = next; + } + + collector->free_head = NULL; +} + +static void pn_collector_finalize(void *obj) +{ + pn_collector_t *collector = (pn_collector_t *) obj; + pn_collector_drain(collector); + pn_collector_shrink(collector); +} + +static int pn_collector_inspect(void *obj, pn_string_t *dst) +{ + assert(obj); + pn_collector_t *collector = (pn_collector_t *) obj; + int err = pn_string_addf(dst, "EVENTS["); + if (err) return err; + pn_event_t *event = collector->head; + bool first = true; + while (event) { + if (first) { + first = false; + } else { + err = pn_string_addf(dst, ", "); + if (err) return err; + } + err = pn_inspect(event, dst); + if (err) return err; + event = event->next; + } + return pn_string_addf(dst, "]"); +} + +#define pn_collector_hashcode NULL +#define pn_collector_compare NULL + +pn_collector_t *pn_collector(void) +{ + static const pn_class_t clazz = PN_CLASS(pn_collector); + pn_collector_t *collector = (pn_collector_t *) pn_class_new(&clazz, sizeof(pn_collector_t)); + return collector; +} + +void pn_collector_free(pn_collector_t *collector) +{ + collector->freed = true; + pn_collector_drain(collector); + pn_collector_shrink(collector); + pn_class_decref(PN_OBJECT, collector); +} + +pn_event_t *pn_event(void); +static void pn_event_initialize(void *obj); + +pn_event_t *pn_collector_put(pn_collector_t *collector, + const pn_class_t *clazz, void *context, + pn_event_type_t type) +{ + if (!collector) { + return NULL; + } + + assert(context); + + if (collector->freed) { + return NULL; + } + + pn_event_t *tail = collector->tail; + if (tail && tail->type == type && tail->context == context) { + return NULL; + } + + clazz = clazz->reify(context); + + pn_event_t *event; + + if (collector->free_head) { + event = collector->free_head; + collector->free_head = collector->free_head->next; + pn_event_initialize(event); + } else { + event = pn_event(); + } + + if (tail) { + tail->next = event; + collector->tail = event; + } else { + collector->tail = event; + collector->head = event; + } + + event->clazz = clazz; + event->context = context; + event->type = type; + pn_class_incref(clazz, event->context); + + //printf("event %s on %p\n", pn_event_type_name(event->type), event->context); + + return event; +} + +pn_event_t *pn_collector_peek(pn_collector_t *collector) +{ + return collector->head; +} + +bool pn_collector_pop(pn_collector_t *collector) +{ + pn_event_t *event = collector->head; + if (event) { + collector->head = event->next; + } else { + return false; + } + + if (!collector->head) { + collector->tail = NULL; + } + + // decref before adding to the free list + if (event->context) { + pn_class_decref(event->clazz, event->context); + event->context = NULL; + } + + event->next = collector->free_head; + collector->free_head = event; + + return true; +} + +static void pn_event_initialize(void *obj) +{ + pn_event_t *event = (pn_event_t *) obj; + event->type = PN_EVENT_NONE; + event->clazz = NULL; + event->context = NULL; + event->next = NULL; +} + +static void pn_event_finalize(void *obj) {} + +static int pn_event_inspect(void *obj, pn_string_t *dst) +{ + assert(obj); + pn_event_t *event = (pn_event_t *) obj; + int err = pn_string_addf(dst, "(0x%X", (unsigned int)event->type); + if (event->context) { + err = pn_string_addf(dst, ", "); + if (err) return err; + err = pn_class_inspect(event->clazz, event->context, dst); + if (err) return err; + } + + return pn_string_addf(dst, ")"); +} + +#define pn_event_hashcode NULL +#define pn_event_compare NULL + +pn_event_t *pn_event(void) +{ + static const pn_class_t clazz = PN_CLASS(pn_event); + pn_event_t *event = (pn_event_t *) pn_class_new(&clazz, sizeof(pn_event_t)); + return event; +} + +pn_event_type_t pn_event_type(pn_event_t *event) +{ + return event->type; +} + +const pn_class_t *pn_event_class(pn_event_t *event) +{ + assert(event); + return event->clazz; +} + +void *pn_event_context(pn_event_t *event) +{ + assert(event); + return event->context; +} + +const char *pn_event_type_name(pn_event_type_t type) +{ + switch (type) { + case PN_EVENT_NONE: + return "PN_EVENT_NONE"; + case PN_CONNECTION_INIT: + return "PN_CONNECTION_INIT"; + case PN_CONNECTION_BOUND: + return "PN_CONNECTION_BOUND"; + case PN_CONNECTION_UNBOUND: + return "PN_CONNECTION_UNBOUND"; + case PN_CONNECTION_REMOTE_OPEN: + return "PN_CONNECTION_REMOTE_OPEN"; + case PN_CONNECTION_OPEN: + return "PN_CONNECTION_OPEN"; + case PN_CONNECTION_REMOTE_CLOSE: + return "PN_CONNECTION_REMOTE_CLOSE"; + case PN_CONNECTION_CLOSE: + return "PN_CONNECTION_CLOSE"; + case PN_CONNECTION_FINAL: + return "PN_CONNECTION_FINAL"; + case PN_SESSION_INIT: + return "PN_SESSION_INIT"; + case PN_SESSION_REMOTE_OPEN: + return "PN_SESSION_REMOTE_OPEN"; + case PN_SESSION_OPEN: + return "PN_SESSION_OPEN"; + case PN_SESSION_REMOTE_CLOSE: + return "PN_SESSION_REMOTE_CLOSE"; + case PN_SESSION_CLOSE: + return "PN_SESSION_CLOSE"; + case PN_SESSION_FINAL: + return "PN_SESSION_FINAL"; + case PN_LINK_INIT: + return "PN_LINK_INIT"; + case PN_LINK_REMOTE_OPEN: + return "PN_LINK_REMOTE_OPEN"; + case PN_LINK_OPEN: + return "PN_LINK_OPEN"; + case PN_LINK_REMOTE_CLOSE: + return "PN_LINK_REMOTE_CLOSE"; + case PN_LINK_DETACH: + return "PN_LINK_DETACH"; + case PN_LINK_REMOTE_DETACH: + return "PN_LINK_REMOTE_DETACH"; + case PN_LINK_CLOSE: + return "PN_LINK_CLOSE"; + case PN_LINK_FLOW: + return "PN_LINK_FLOW"; + case PN_LINK_FINAL: + return "PN_LINK_FINAL"; + case PN_DELIVERY: + return "PN_DELIVERY"; + case PN_TRANSPORT: + return "PN_TRANSPORT"; + case PN_TRANSPORT_ERROR: + return "PN_TRANSPORT_ERROR"; + case PN_TRANSPORT_HEAD_CLOSED: + return "PN_TRANSPORT_HEAD_CLOSED"; + case PN_TRANSPORT_TAIL_CLOSED: + return "PN_TRANSPORT_TAIL_CLOSED"; + case PN_TRANSPORT_CLOSED: + return "PN_TRANSPORT_CLOSED"; + } + + return "<unrecognized>"; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/message/message.c ---------------------------------------------------------------------- diff --git a/proton-c/src/message/message.c b/proton-c/src/message/message.c index d91ab63..c158345 100644 --- a/proton-c/src/message/message.c +++ b/proton-c/src/message/message.c @@ -29,8 +29,8 @@ #include <stdio.h> #include <assert.h> #include "protocol.h" -#include "../util.h" -#include "../platform_fmt.h" +#include "util.h" +#include "platform_fmt.h" ssize_t pn_message_data(char *dst, size_t available, const char *src, size_t size) { @@ -322,7 +322,7 @@ int pn_message_inspect(void *obj, pn_string_t *dst) pn_message_t *pn_message() { static const pn_class_t clazz = PN_CLASS(pn_message); - pn_message_t *msg = (pn_message_t *) pn_new(sizeof(pn_message_t), &clazz); + pn_message_t *msg = (pn_message_t *) pn_class_new(&clazz, sizeof(pn_message_t)); msg->durable = false; msg->priority = PN_DEFAULT_PRIORITY; msg->ttl = 0; @@ -975,6 +975,7 @@ int pn_message_save_data(pn_message_t *msg, char *data, size_t *size) pn_data_error(msg->body)); if (scanned) { if (bytes.size > *size) { + *size = bytes.size; return PN_OVERFLOW; } else { memcpy(data, bytes.start, bytes.size); @@ -997,6 +998,7 @@ int pn_message_save_text(pn_message_t *msg, char *data, size_t *size) { pn_bytes_t str = pn_data_get_bytes(msg->body); if (str.size >= *size) { + *size = str.size; return PN_OVERFLOW; } else { memcpy(data, str.start, str.size); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/messenger/messenger.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/messenger.c b/proton-c/src/messenger/messenger.c index 0e2488b..f0204b9 100644 --- a/proton-c/src/messenger/messenger.c +++ b/proton-c/src/messenger/messenger.c @@ -20,10 +20,13 @@ */ #include <proton/messenger.h> -#include <proton/sasl.h> -#include <proton/ssl.h> -#include <proton/util.h> + +#include <proton/connection.h> +#include <proton/delivery.h> +#include <proton/event.h> #include <proton/object.h> +#include <proton/sasl.h> +#include <proton/session.h> #include <proton/selector.h> #include <assert.h> @@ -32,13 +35,13 @@ #include <string.h> #include <stdio.h> -#include "../util.h" -#include "../platform.h" -#include "../platform_fmt.h" +#include "util.h" +#include "platform.h" +#include "platform_fmt.h" #include "store.h" #include "transform.h" #include "subscription.h" -#include "../selectable.h" +#include "selectable.h" typedef struct pn_link_ctx_t pn_link_ctx_t; @@ -54,10 +57,11 @@ typedef struct { } pn_address_t; // algorithm for granting credit to receivers -typedef enum { +typedef enum { // pn_messenger_recv( X ), where: - LINK_CREDIT_EXPLICIT, // X > 0 - LINK_CREDIT_AUTO // X == -1 + LINK_CREDIT_EXPLICIT, // X > 0 + LINK_CREDIT_AUTO, // X == -1 + LINK_CREDIT_MANUAL // X == -2 } pn_link_credit_mode_t; struct pn_messenger_t { @@ -100,6 +104,11 @@ struct pn_messenger_t { int receivers; // # receiver links int draining; // # links in drain state int connection_error; + int flags; + pn_snd_settle_mode_t snd_settle_mode; + pn_rcv_settle_mode_t rcv_settle_mode; + pn_tracer_t tracer; + pn_ssl_verify_mode_t ssl_peer_authentication_mode; bool blocking; bool passive; bool interrupted; @@ -372,10 +381,14 @@ static pn_listener_ctx_t *pn_listener_ctx(pn_messenger_t *messenger, pn_socket_t socket = pn_listen(messenger->io, host, port ? port : default_port(scheme)); if (socket == PN_INVALID_SOCKET) { pn_error_copy(messenger->error, pn_io_error(messenger->io)); + pn_error_format(messenger->error, PN_ERR, "CONNECTION ERROR (%s:%s): %s\n", + messenger->address.host, messenger->address.port, + pn_error_text(messenger->error)); + return NULL; } - pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_new(sizeof(pn_listener_ctx_t), NULL); + pn_listener_ctx_t *ctx = (pn_listener_ctx_t *) pn_class_new(PN_OBJECT, sizeof(pn_listener_ctx_t)); ctx->messenger = messenger; ctx->domain = pn_ssl_domain(PN_SSL_MODE_SERVER); if (messenger->certificate) { @@ -596,7 +609,7 @@ pn_messenger_t *pn_messenger(const char *name) m->blocking = true; m->passive = false; m->io = pn_io(); - m->pending = pn_list(0, 0); + m->pending = pn_list(PN_WEAKREF, 0); m->interruptor = pni_selectable (pni_interruptor_capacity, pni_interruptor_pending, pni_interruptor_deadline, pni_interruptor_readable, @@ -611,8 +624,8 @@ pn_messenger_t *pn_messenger(const char *name) pn_pipe(m->io, m->ctrl); pni_selectable_set_fd(m->interruptor, m->ctrl[0]); pni_selectable_set_context(m->interruptor, m); - m->listeners = pn_list(0, 0); - m->connections = pn_list(0, 0); + m->listeners = pn_list(PN_WEAKREF, 0); + m->connections = pn_list(PN_WEAKREF, 0); m->selector = pn_io_selector(m->io); m->collector = pn_collector(); m->credit_mode = LINK_CREDIT_EXPLICIT; @@ -621,13 +634,13 @@ pn_messenger_t *pn_messenger(const char *name) m->distributed = 0; m->receivers = 0; m->draining = 0; - m->credited = pn_list(0, 0); - m->blocked = pn_list(0, 0); + m->credited = pn_list(PN_WEAKREF, 0); + m->blocked = pn_list(PN_WEAKREF, 0); m->next_drain = 0; m->next_tag = 0; m->outgoing = pni_store(); m->incoming = pni_store(); - m->subscriptions = pn_list(0, PN_REFCOUNT); + m->subscriptions = pn_list(PN_OBJECT, 0); m->incoming_subscription = NULL; m->error = pn_error(); m->routes = pn_transform(); @@ -639,6 +652,11 @@ pn_messenger_t *pn_messenger(const char *name) m->rewritten = pn_string(NULL); m->domain = pn_string(NULL); m->connection_error = 0; + m->flags = 0; + m->snd_settle_mode = PN_SND_SETTLED; + m->rcv_settle_mode = PN_RCV_FIRST; + m->tracer = NULL; + m->ssl_peer_authentication_mode = PN_SSL_VERIFY_PEER_NAME; } return m; @@ -840,6 +858,8 @@ bool pn_messenger_flow(pn_messenger_t *messenger) const int used = messenger->distributed + pn_messenger_incoming(messenger); if (max > used) messenger->credit = max - used; + } else if (messenger->credit_mode == LINK_CREDIT_MANUAL) { + return false; } const int batch = per_link_credit(messenger); @@ -896,6 +916,8 @@ static int pn_transport_config(pn_messenger_t *messenger, { pn_connection_ctx_t *ctx = (pn_connection_ctx_t *) pn_connection_get_context(connection); pn_transport_t *transport = pn_connection_transport(connection); + if (messenger->tracer) + pn_transport_set_tracer(transport, messenger->tracer); if (ctx->scheme && !strcmp(ctx->scheme, "amqps")) { pn_ssl_domain_t *d = pn_ssl_domain(PN_SSL_MODE_CLIENT); if (messenger->certificate && messenger->private_key) { @@ -913,7 +935,8 @@ static int pn_transport_config(pn_messenger_t *messenger, pn_error_report("CONNECTION", "invalid certificate db"); return err; } - err = pn_ssl_domain_set_peer_authentication(d, PN_SSL_VERIFY_PEER_NAME, NULL); + err = pn_ssl_domain_set_peer_authentication( + d, messenger->ssl_peer_authentication_mode, NULL); if (err) { pn_error_report("CONNECTION", "error configuring ssl to verify peer"); } @@ -985,33 +1008,38 @@ int pni_pump_in(pn_messenger_t *messenger, const char *address, pn_link_t *recei n = pn_link_recv(receiver, encoded + pending, 1); pn_link_advance(receiver); - // account for the used credit - assert( ctx ); - assert( messenger->distributed ); - messenger->distributed--; - pn_link_t *link = receiver; - // replenish if low (< 20% maximum batch) and credit available - if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 && messenger->credit > 0) { - const int max = per_link_credit(messenger); - const int lo_thresh = (int)(max * 0.2 + 0.5); - if (pn_link_remote_credit(link) < lo_thresh) { - const int more = pn_min(messenger->credit, max - pn_link_remote_credit(link)); - messenger->credit -= more; - messenger->distributed += more; - pn_link_flow(link, more); + if (messenger->credit_mode != LINK_CREDIT_MANUAL) { + // account for the used credit + assert(ctx); + assert(messenger->distributed); + messenger->distributed--; + + // replenish if low (< 20% maximum batch) and credit available + if (!pn_link_get_drain(link) && pn_list_size(messenger->blocked) == 0 && + messenger->credit > 0) { + const int max = per_link_credit(messenger); + const int lo_thresh = (int)(max * 0.2 + 0.5); + if (pn_link_remote_credit(link) < lo_thresh) { + const int more = + pn_min(messenger->credit, max - pn_link_remote_credit(link)); + messenger->credit -= more; + messenger->distributed += more; + pn_link_flow(link, more); + } } - } - // check if blocked - if (pn_list_index(messenger->blocked, link) < 0 && pn_link_remote_credit(link) == 0) { - pn_list_remove(messenger->credited, link); - if (pn_link_get_drain(link)) { - pn_link_set_drain(link, false); - assert( messenger->draining > 0 ); - messenger->draining--; + // check if blocked + if (pn_list_index(messenger->blocked, link) < 0 && + pn_link_remote_credit(link) == 0) { + pn_list_remove(messenger->credited, link); + if (pn_link_get_drain(link)) { + pn_link_set_drain(link, false); + assert(messenger->draining > 0); + messenger->draining--; + } + pn_list_add(messenger->blocked, link); } - pn_list_add(messenger->blocked, link); } if (n != PN_EOS) { @@ -1248,8 +1276,10 @@ int pn_messenger_process_events(pn_messenger_t *messenger) break; case PN_LINK_REMOTE_OPEN: case PN_LINK_REMOTE_CLOSE: + case PN_LINK_REMOTE_DETACH: case PN_LINK_OPEN: case PN_LINK_CLOSE: + case PN_LINK_DETACH: pn_messenger_process_link(messenger, event); break; case PN_LINK_FLOW: @@ -1259,10 +1289,18 @@ int pn_messenger_process_events(pn_messenger_t *messenger) pn_messenger_process_delivery(messenger, event); break; case PN_TRANSPORT: + case PN_TRANSPORT_ERROR: + case PN_TRANSPORT_HEAD_CLOSED: + case PN_TRANSPORT_TAIL_CLOSED: + case PN_TRANSPORT_CLOSED: pn_messenger_process_transport(messenger, event); break; case PN_EVENT_NONE: break; + case PN_CONNECTION_BOUND: + break; + case PN_CONNECTION_UNBOUND: + break; case PN_CONNECTION_FINAL: break; case PN_SESSION_FINAL: @@ -1422,11 +1460,85 @@ int pn_messenger_sync(pn_messenger_t *messenger, bool (*predicate)(pn_messenger_ } } +static void pni_parse(pn_address_t *address); +pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, + const char *address, char **name); +int pn_messenger_work(pn_messenger_t *messenger, int timeout); + int pn_messenger_start(pn_messenger_t *messenger) { if (!messenger) return PN_ARG_ERR; - // right now this is a noop - return 0; + + int error = 0; + + // When checking of routes is required we attempt to resolve each route + // with a substitution that has a defined scheme, address and port. If + // any of theses routes is invalid an appropriate error code will be + // returned. Currently no attempt is made to check the name part of the + // address, as the intent here is to fail fast if the addressed host + // is invalid or unavailable. + if (messenger->flags | PN_FLAGS_CHECK_ROUTES) { + pn_list_t *substitutions = pn_list(PN_WEAKREF, 0); + pn_transform_get_substitutions(messenger->routes, substitutions); + for (size_t i = 0; i < pn_list_size(substitutions) && error == 0; i++) { + pn_string_t *substitution = (pn_string_t *)pn_list_get(substitutions, i); + if (substitution) { + pn_address_t addr; + addr.text = pn_string(NULL); + error = pn_string_copy(addr.text, substitution); + if (!error) { + pni_parse(&addr); + if (addr.scheme && strlen(addr.scheme) > 0 && + !strstr(addr.scheme, "$") && addr.host && strlen(addr.host) > 0 && + !strstr(addr.host, "$") && addr.port && strlen(addr.port) > 0 && + !strstr(addr.port, "$")) { + pn_string_t *check_addr = pn_string(NULL); + // ipv6 hosts need to be wrapped in [] within a URI + if (strstr(addr.host, ":")) { + pn_string_format(check_addr, "%s://[%s]:%s/", addr.scheme, + addr.host, addr.port); + } else { + pn_string_format(check_addr, "%s://%s:%s/", addr.scheme, + addr.host, addr.port); + } + char *name = NULL; + pn_connection_t *connection = pn_messenger_resolve( + messenger, pn_string_get(check_addr), &name); + pn_free(check_addr); + if (!connection) { + if (pn_error_code(messenger->error) == 0) + pn_error_copy(messenger->error, pn_io_error(messenger->io)); + pn_error_format(messenger->error, PN_ERR, + "CONNECTION ERROR (%s:%s): %s\n", + messenger->address.host, messenger->address.port, + pn_error_text(messenger->error)); + error = pn_error_code(messenger->error); + } else { + // Send and receive outstanding messages until connection + // completes or an error occurs + int work = pn_messenger_work(messenger, -1); + pn_connection_ctx_t *cctx = + (pn_connection_ctx_t *)pn_connection_get_context(connection); + while ((work > 0 || + (pn_connection_state(connection) & PN_REMOTE_UNINIT) || + pni_connection_pending(cctx->selectable) != (ssize_t)0) && + pn_error_code(messenger->error) == 0) + work = pn_messenger_work(messenger, 0); + if (work < 0 && work != PN_TIMEOUT) { + error = work; + } else { + error = pn_error_code(messenger->error); + } + } + } + pn_free(addr.text); + } + } + } + pn_free(substitutions); + } + + return error; } bool pn_messenger_stopped(pn_messenger_t *messenger) @@ -1560,12 +1672,12 @@ pn_connection_t *pn_messenger_resolve(pn_messenger_t *messenger, const char *add return connection; } -pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, bool sender) +PN_EXTERN pn_link_t *pn_messenger_get_link(pn_messenger_t *messenger, + const char *address, bool sender) { char *name = NULL; pn_connection_t *connection = pn_messenger_resolve(messenger, address, &name); if (!connection) return NULL; - pn_connection_ctx_t *cctx = (pn_connection_ctx_t *) pn_connection_get_context(connection); pn_link_t *link = pn_link_head(connection, PN_LOCAL_ACTIVE); while (link) { @@ -1578,6 +1690,22 @@ pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, boo } link = pn_link_next(link, PN_LOCAL_ACTIVE); } + return NULL; +} + +pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, + bool sender, pn_seconds_t timeout) +{ + char *name = NULL; + pn_connection_t *connection = pn_messenger_resolve(messenger, address, &name); + if (!connection) + return NULL; + pn_connection_ctx_t *cctx = + (pn_connection_ctx_t *)pn_connection_get_context(connection); + + pn_link_t *link = pn_messenger_get_link(messenger, address, sender); + if (link) + return link; pn_session_t *ssn = pn_session(connection); pn_session_open(ssn); @@ -1593,9 +1721,9 @@ pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, boo if ((sender && pn_messenger_get_outgoing_window(messenger)) || (!sender && pn_messenger_get_incoming_window(messenger))) { - // use explicit settlement via dispositions (not pre-settled) - pn_link_set_snd_settle_mode( link, PN_SND_UNSETTLED ); - pn_link_set_rcv_settle_mode( link, PN_RCV_SECOND ); + // use required settlement (defaults to sending pre-settled messages) + pn_link_set_snd_settle_mode(link, messenger->snd_settle_mode); + pn_link_set_rcv_settle_mode(link, messenger->rcv_settle_mode); } // XXX if (pn_streq(name, "#")) { @@ -1609,6 +1737,14 @@ pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, boo pn_terminus_set_address(pn_link_source(link), name); } link_ctx_setup( messenger, connection, link ); + + if (timeout > 0) { + pn_terminus_set_expiry_policy(pn_link_target(link), PN_EXPIRE_WITH_LINK); + pn_terminus_set_expiry_policy(pn_link_source(link), PN_EXPIRE_WITH_LINK); + pn_terminus_set_timeout(pn_link_target(link), timeout); + pn_terminus_set_timeout(pn_link_source(link), timeout); + } + if (!sender) { pn_link_ctx_t *ctx = (pn_link_ctx_t *)pn_link_get_context(link); assert( ctx ); @@ -1619,18 +1755,27 @@ pn_link_t *pn_messenger_link(pn_messenger_t *messenger, const char *address, boo return link; } -pn_link_t *pn_messenger_source(pn_messenger_t *messenger, const char *source) +pn_link_t *pn_messenger_source(pn_messenger_t *messenger, const char *source, + pn_seconds_t timeout) { - return pn_messenger_link(messenger, source, false); + return pn_messenger_link(messenger, source, false, timeout); } -pn_link_t *pn_messenger_target(pn_messenger_t *messenger, const char *target) +pn_link_t *pn_messenger_target(pn_messenger_t *messenger, const char *target, + pn_seconds_t timeout) { - return pn_messenger_link(messenger, target, true); + return pn_messenger_link(messenger, target, true, timeout); } pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char *source) { + return pn_messenger_subscribe_ttl(messenger, source, 0); +} + +pn_subscription_t *pn_messenger_subscribe_ttl(pn_messenger_t *messenger, + const char *source, + pn_seconds_t timeout) +{ pni_route(messenger, source); if (pn_error_code(messenger->error)) return NULL; @@ -1647,7 +1792,7 @@ pn_subscription_t *pn_messenger_subscribe(pn_messenger_t *messenger, const char return NULL; } } else { - pn_link_t *src = pn_messenger_source(messenger, source); + pn_link_t *src = pn_messenger_source(messenger, source, timeout); if (!src) return NULL; pn_link_ctx_t *ctx = (pn_link_ctx_t *) pn_link_get_context( src ); return ctx ? ctx->subscription : NULL; @@ -1820,7 +1965,7 @@ int pn_messenger_put(pn_messenger_t *messenger, pn_message_t *msg) } else { pni_restore(messenger, msg); pn_buffer_append(buf, encoded, size); // XXX - pn_link_t *sender = pn_messenger_target(messenger, address); + pn_link_t *sender = pn_messenger_target(messenger, address, 0); if (!sender) { int err = pn_error_code(messenger->error); if (err) { @@ -1865,6 +2010,18 @@ pn_status_t pn_messenger_status(pn_messenger_t *messenger, pn_tracker_t tracker) } } +pn_delivery_t *pn_messenger_delivery(pn_messenger_t *messenger, + pn_tracker_t tracker) +{ + pni_store_t *store = pn_tracker_store(messenger, tracker); + pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker)); + if (e) { + return pni_entry_get_delivery(e); + } else { + return NULL; + } +} + bool pn_messenger_buffered(pn_messenger_t *messenger, pn_tracker_t tracker) { pni_store_t *store = pn_tracker_store(messenger, tracker); @@ -2007,7 +2164,9 @@ int pn_messenger_recv(pn_messenger_t *messenger, int n) return pn_error_format(messenger->error, PN_STATE_ERR, "no valid sources"); // re-compute credit, and update credit scheduler - if (n == -1) { + if (n == -2) { + messenger->credit_mode = LINK_CREDIT_MANUAL; + } else if (n == -1) { messenger->credit_mode = LINK_CREDIT_AUTO; } else { messenger->credit_mode = LINK_CREDIT_EXPLICIT; @@ -2100,6 +2259,20 @@ int pn_messenger_reject(pn_messenger_t *messenger, pn_tracker_t tracker, int fla PN_STATUS_REJECTED, flags, false, false); } +PN_EXTERN pn_link_t *pn_messenger_tracker_link(pn_messenger_t *messenger, + pn_tracker_t tracker) +{ + pni_store_t *store = pn_tracker_store(messenger, tracker); + pni_entry_t *e = pni_store_entry(store, pn_tracker_sequence(tracker)); + if (e) { + pn_delivery_t *d = pni_entry_get_delivery(e); + if (d) { + return pn_delivery_link(d); + } + } + return NULL; +} + int pn_messenger_queued(pn_messenger_t *messenger, bool sender) { if (!messenger) return 0; @@ -2146,3 +2319,81 @@ int pn_messenger_rewrite(pn_messenger_t *messenger, const char *pattern, const c pn_transform_rule(messenger->rewrites, pattern, address); return 0; } + +PN_EXTERN int pn_messenger_set_flags(pn_messenger_t *messenger, const int flags) +{ + if (!messenger) + return PN_ARG_ERR; + if (flags != 0 && (flags ^ PN_FLAGS_CHECK_ROUTES) != 0) + return PN_ARG_ERR; + messenger->flags = flags; + return 0; +} + +PN_EXTERN int pn_messenger_get_flags(pn_messenger_t *messenger) +{ + return messenger ? messenger->flags : 0; +} + +int pn_messenger_set_snd_settle_mode(pn_messenger_t *messenger, + const pn_snd_settle_mode_t mode) +{ + if (!messenger) + return PN_ARG_ERR; + messenger->snd_settle_mode = mode; + return 0; +} + +int pn_messenger_set_rcv_settle_mode(pn_messenger_t *messenger, + const pn_rcv_settle_mode_t mode) +{ + if (!messenger) + return PN_ARG_ERR; + messenger->rcv_settle_mode = mode; + return 0; +} + +void pn_messenger_set_tracer(pn_messenger_t *messenger, pn_tracer_t tracer) +{ + assert(messenger); + assert(tracer); + + messenger->tracer = tracer; +} + +pn_millis_t pn_messenger_get_remote_idle_timeout(pn_messenger_t *messenger, + const char *address) +{ + if (!messenger) + return PN_ARG_ERR; + + pn_address_t addr; + addr.text = pn_string(address); + pni_parse(&addr); + + pn_millis_t timeout = -1; + for (size_t i = 0; i < pn_list_size(messenger->connections); i++) { + pn_connection_t *connection = + (pn_connection_t *)pn_list_get(messenger->connections, i); + pn_connection_ctx_t *ctx = + (pn_connection_ctx_t *)pn_connection_get_context(connection); + if (pn_streq(addr.scheme, ctx->scheme) && pn_streq(addr.host, ctx->host) && + pn_streq(addr.port, ctx->port)) { + pn_transport_t *transport = pn_connection_transport(connection); + if (transport) + timeout = pn_transport_get_remote_idle_timeout(transport); + break; + } + } + return timeout; +} + +int +pn_messenger_set_ssl_peer_authentication_mode(pn_messenger_t *messenger, + const pn_ssl_verify_mode_t mode) +{ + if (!messenger) + return PN_ARG_ERR; + messenger->ssl_peer_authentication_mode = mode; + return 0; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/messenger/store.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/store.c b/proton-c/src/messenger/store.c index 88d6a5d..83b9b68 100644 --- a/proton-c/src/messenger/store.c +++ b/proton-c/src/messenger/store.c @@ -28,7 +28,7 @@ #endif #include <stdlib.h> #include <string.h> -#include "../util.h" +#include "util.h" #include "store.h" typedef struct pni_stream_t pni_stream_t; @@ -89,7 +89,7 @@ pni_store_t *pni_store() store->window = 0; store->lwm = 0; store->hwm = 0; - store->tracked = pn_hash(0, 0.75, PN_REFCOUNT); + store->tracked = pn_hash(PN_OBJECT, 0, 0.75); return store; } @@ -197,6 +197,7 @@ pni_stream_t *pni_stream_get(pni_store_t *store, const char *address) return pni_stream(store, address, false); } +#define CID_pni_entry CID_pn_object #define pni_entry_initialize NULL #define pni_entry_hashcode NULL #define pni_entry_compare NULL @@ -210,7 +211,7 @@ pni_entry_t *pni_store_put(pni_store_t *store, const char *address) if (!address) address = ""; pni_stream_t *stream = pni_stream_put(store, address); if (!stream) return NULL; - pni_entry_t *entry = (pni_entry_t *) pn_new(sizeof(pni_entry_t), &clazz); + pni_entry_t *entry = (pni_entry_t *) pn_class_new(&clazz, sizeof(pni_entry_t)); if (!entry) return NULL; entry->stream = stream; entry->free = false; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/messenger/subscription.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/subscription.c b/proton-c/src/messenger/subscription.c index 346a23f..c26d40a 100644 --- a/proton-c/src/messenger/subscription.c +++ b/proton-c/src/messenger/subscription.c @@ -55,6 +55,7 @@ void pn_subscription_finalize(void *obj) pn_free(sub->address); } +#define CID_pn_subscription CID_pn_object #define pn_subscription_hashcode NULL #define pn_subscription_compare NULL #define pn_subscription_inspect NULL @@ -65,13 +66,13 @@ pn_subscription_t *pn_subscription(pn_messenger_t *messenger, const char *port) { static const pn_class_t clazz = PN_CLASS(pn_subscription); - pn_subscription_t *sub = (pn_subscription_t *) pn_new(sizeof(pn_subscription_t), &clazz); + pn_subscription_t *sub = (pn_subscription_t *) pn_class_new(&clazz, sizeof(pn_subscription_t)); sub->messenger = messenger; pn_string_set(sub->scheme, scheme); pn_string_set(sub->host, host); pn_string_set(sub->port, port); pni_messenger_add_subscription(messenger, sub); - pn_decref(sub); + pn_class_decref(PN_OBJECT, sub); return sub; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/messenger/transform.c ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/transform.c b/proton-c/src/messenger/transform.c index 801eb10..8f18667 100644 --- a/proton-c/src/messenger/transform.c +++ b/proton-c/src/messenger/transform.c @@ -19,7 +19,6 @@ * */ -#include <proton/object.h> #include <string.h> #include <assert.h> #include <ctype.h> @@ -55,6 +54,7 @@ static void pn_rule_finalize(void *object) pn_free(rule->substitution); } +#define CID_pn_rule CID_pn_object #define pn_rule_initialize NULL #define pn_rule_hashcode NULL #define pn_rule_compare NULL @@ -63,7 +63,7 @@ static void pn_rule_finalize(void *object) pn_rule_t *pn_rule(const char *pattern, const char *substitution) { static const pn_class_t clazz = PN_CLASS(pn_rule); - pn_rule_t *rule = (pn_rule_t *) pn_new(sizeof(pn_rule_t), &clazz); + pn_rule_t *rule = (pn_rule_t *) pn_class_new(&clazz, sizeof(pn_rule_t)); rule->pattern = pn_string(pattern); rule->substitution = pn_string(substitution); return rule; @@ -75,6 +75,7 @@ static void pn_transform_finalize(void *object) pn_free(transform->rules); } +#define CID_pn_transform CID_pn_object #define pn_transform_initialize NULL #define pn_transform_hashcode NULL #define pn_transform_compare NULL @@ -83,8 +84,8 @@ static void pn_transform_finalize(void *object) pn_transform_t *pn_transform() { static const pn_class_t clazz = PN_CLASS(pn_transform); - pn_transform_t *transform = (pn_transform_t *) pn_new(sizeof(pn_transform_t), &clazz); - transform->rules = pn_list(0, PN_REFCOUNT); + pn_transform_t *transform = (pn_transform_t *) pn_class_new(&clazz, sizeof(pn_transform_t)); + transform->rules = pn_list(PN_OBJECT, 0); transform->matched = false; return transform; } @@ -239,3 +240,15 @@ bool pn_transform_matched(pn_transform_t *transform) { return transform->matched; } + +int pn_transform_get_substitutions(pn_transform_t *transform, + pn_list_t *substitutions) +{ + int size = pn_list_size(transform->rules); + for (size_t i = 0; i < (size_t)size; i++) { + pn_rule_t *rule = (pn_rule_t *)pn_list_get(transform->rules, i); + pn_list_add(substitutions, rule->substitution); + } + + return size; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/messenger/transform.h ---------------------------------------------------------------------- diff --git a/proton-c/src/messenger/transform.h b/proton-c/src/messenger/transform.h index 1662f38..8160be3 100644 --- a/proton-c/src/messenger/transform.h +++ b/proton-c/src/messenger/transform.h @@ -22,6 +22,7 @@ * */ +#include <proton/object.h> #include <proton/buffer.h> typedef struct pn_transform_t pn_transform_t; @@ -32,6 +33,7 @@ void pn_transform_rule(pn_transform_t *transform, const char *pattern, int pn_transform_apply(pn_transform_t *transform, const char *src, pn_string_t *dest); bool pn_transform_matched(pn_transform_t *transform); - +int pn_transform_get_substitutions(pn_transform_t *transform, + pn_list_t *substitutions); #endif /* transform.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/object/iterator.c ---------------------------------------------------------------------- diff --git a/proton-c/src/object/iterator.c b/proton-c/src/object/iterator.c new file mode 100644 index 0000000..61b3b8e --- /dev/null +++ b/proton-c/src/object/iterator.c @@ -0,0 +1,78 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/object.h> +#include <stdlib.h> +#include <assert.h> + +struct pn_iterator_t { + pn_iterator_next_t next; + size_t size; + void *state; +}; + +static void pn_iterator_initialize(void *object) +{ + pn_iterator_t *it = (pn_iterator_t *) object; + it->next = NULL; + it->size = 0; + it->state = NULL; +} + +static void pn_iterator_finalize(void *object) +{ + pn_iterator_t *it = (pn_iterator_t *) object; + free(it->state); +} + +#define CID_pn_iterator CID_pn_object +#define pn_iterator_hashcode NULL +#define pn_iterator_compare NULL +#define pn_iterator_inspect NULL + +pn_iterator_t *pn_iterator() +{ + static const pn_class_t clazz = PN_CLASS(pn_iterator); + pn_iterator_t *it = (pn_iterator_t *) pn_class_new(&clazz, sizeof(pn_iterator_t)); + return it; +} + +void *pn_iterator_start(pn_iterator_t *iterator, pn_iterator_next_t next, + size_t size) { + assert(iterator); + assert(next); + iterator->next = next; + if (iterator->size < size) { + iterator->state = realloc(iterator->state, size); + } + return iterator->state; +} + +void *pn_iterator_next(pn_iterator_t *iterator) { + assert(iterator); + if (iterator->next) { + void *result = iterator->next(iterator->state); + if (!result) iterator->next = NULL; + return result; + } else { + return NULL; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/object/list.c ---------------------------------------------------------------------- diff --git a/proton-c/src/object/list.c b/proton-c/src/object/list.c new file mode 100644 index 0000000..7936f5b --- /dev/null +++ b/proton-c/src/object/list.c @@ -0,0 +1,225 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/object.h> +#include <stdlib.h> +#include <assert.h> + +struct pn_list_t { + const pn_class_t *clazz; + size_t capacity; + size_t size; + void **elements; +}; + +size_t pn_list_size(pn_list_t *list) +{ + assert(list); + return list->size; +} + +void *pn_list_get(pn_list_t *list, int index) +{ + assert(list); assert(list->size); + return list->elements[index % list->size]; +} + +void pn_list_set(pn_list_t *list, int index, void *value) +{ + assert(list); assert(list->size); + void *old = list->elements[index % list->size]; + pn_class_decref(list->clazz, old); + list->elements[index % list->size] = value; + pn_class_incref(list->clazz, value); +} + +void pn_list_ensure(pn_list_t *list, size_t capacity) +{ + assert(list); + if (list->capacity < capacity) { + size_t newcap = list->capacity; + while (newcap < capacity) { newcap *= 2; } + list->elements = (void **) realloc(list->elements, newcap * sizeof(void *)); + assert(list->elements); + list->capacity = newcap; + } +} + +int pn_list_add(pn_list_t *list, void *value) +{ + assert(list); + pn_list_ensure(list, list->size + 1); + list->elements[list->size++] = value; + pn_class_incref(list->clazz, value); + return 0; +} + +ssize_t pn_list_index(pn_list_t *list, void *value) +{ + for (size_t i = 0; i < list->size; i++) { + if (pn_equals(list->elements[i], value)) { + return i; + } + } + + return -1; +} + +bool pn_list_remove(pn_list_t *list, void *value) +{ + assert(list); + ssize_t idx = pn_list_index(list, value); + if (idx < 0) { + return false; + } else { + pn_list_del(list, idx, 1); + } + + return true; +} + +void pn_list_del(pn_list_t *list, int index, int n) +{ + assert(list); + index %= list->size; + + for (int i = 0; i < n; i++) { + pn_class_decref(list->clazz, list->elements[index + i]); + } + + size_t slide = list->size - (index + n); + for (size_t i = 0; i < slide; i++) { + list->elements[index + i] = list->elements[index + n + i]; + } + + list->size -= n; +} + +void pn_list_clear(pn_list_t *list) +{ + assert(list); + pn_list_del(list, 0, list->size); +} + +void pn_list_fill(pn_list_t *list, void *value, int n) +{ + for (int i = 0; i < n; i++) { + pn_list_add(list, value); + } +} + +typedef struct { + pn_list_t *list; + size_t index; +} pni_list_iter_t; + +static void *pni_list_next(void *ctx) +{ + pni_list_iter_t *iter = (pni_list_iter_t *) ctx; + if (iter->index < pn_list_size(iter->list)) { + return pn_list_get(iter->list, iter->index++); + } else { + return NULL; + } +} + +void pn_list_iterator(pn_list_t *list, pn_iterator_t *iter) +{ + pni_list_iter_t *liter = (pni_list_iter_t *) pn_iterator_start(iter, pni_list_next, sizeof(pni_list_iter_t)); + liter->list = list; + liter->index = 0; +} + +static void pn_list_finalize(void *object) +{ + assert(object); + pn_list_t *list = (pn_list_t *) object; + for (size_t i = 0; i < list->size; i++) { + pn_class_decref(list->clazz, pn_list_get(list, i)); + } + free(list->elements); +} + +static uintptr_t pn_list_hashcode(void *object) +{ + assert(object); + pn_list_t *list = (pn_list_t *) object; + uintptr_t hash = 1; + + for (size_t i = 0; i < list->size; i++) { + hash = hash * 31 + pn_hashcode(pn_list_get(list, i)); + } + + return hash; +} + +static intptr_t pn_list_compare(void *oa, void *ob) +{ + assert(oa); assert(ob); + pn_list_t *a = (pn_list_t *) oa; + pn_list_t *b = (pn_list_t *) ob; + + size_t na = pn_list_size(a); + size_t nb = pn_list_size(b); + if (na != nb) { + return nb - na; + } else { + for (size_t i = 0; i < na; i++) { + intptr_t delta = pn_compare(pn_list_get(a, i), pn_list_get(b, i)); + if (delta) return delta; + } + } + + return 0; +} + +static int pn_list_inspect(void *obj, pn_string_t *dst) +{ + assert(obj); + pn_list_t *list = (pn_list_t *) obj; + int err = pn_string_addf(dst, "["); + if (err) return err; + size_t n = pn_list_size(list); + for (size_t i = 0; i < n; i++) { + if (i > 0) { + err = pn_string_addf(dst, ", "); + if (err) return err; + } + err = pn_class_inspect(list->clazz, pn_list_get(list, i), dst); + if (err) return err; + } + return pn_string_addf(dst, "]"); +} + +#define pn_list_initialize NULL + +pn_list_t *pn_list(const pn_class_t *clazz, size_t capacity) +{ + static const pn_class_t list_clazz = PN_CLASS(pn_list); + + pn_list_t *list = (pn_list_t *) pn_class_new(&list_clazz, sizeof(pn_list_t)); + list->clazz = clazz; + list->capacity = capacity ? capacity : 16; + list->elements = (void **) malloc(list->capacity * sizeof(void *)); + list->size = 0; + return list; +} + http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-c/src/object/map.c ---------------------------------------------------------------------- diff --git a/proton-c/src/object/map.c b/proton-c/src/object/map.c new file mode 100644 index 0000000..fc98116 --- /dev/null +++ b/proton-c/src/object/map.c @@ -0,0 +1,401 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/object.h> +#include <stdlib.h> +#include <assert.h> + +#define PNI_ENTRY_FREE (0) +#define PNI_ENTRY_LINK (1) +#define PNI_ENTRY_TAIL (2) + +typedef struct { + void *key; + void *value; + size_t next; + uint8_t state; +} pni_entry_t; + +struct pn_map_t { + const pn_class_t *key; + const pn_class_t *value; + pni_entry_t *entries; + size_t capacity; + size_t addressable; + size_t size; + uintptr_t (*hashcode)(void *key); + bool (*equals)(void *a, void *b); + float load_factor; +}; + +static void pn_map_finalize(void *object) +{ + pn_map_t *map = (pn_map_t *) object; + + for (size_t i = 0; i < map->capacity; i++) { + if (map->entries[i].state != PNI_ENTRY_FREE) { + pn_class_decref(map->key, map->entries[i].key); + pn_class_decref(map->value, map->entries[i].value); + } + } + + free(map->entries); +} + +static uintptr_t pn_map_hashcode(void *object) +{ + pn_map_t *map = (pn_map_t *) object; + + uintptr_t hashcode = 0; + + for (size_t i = 0; i < map->capacity; i++) { + if (map->entries[i].state != PNI_ENTRY_FREE) { + void *key = map->entries[i].key; + void *value = map->entries[i].value; + hashcode += pn_hashcode(key) ^ pn_hashcode(value); + } + } + + return hashcode; +} + +static void pni_map_allocate(pn_map_t *map) +{ + map->entries = (pni_entry_t *) malloc(map->capacity * sizeof (pni_entry_t)); + for (size_t i = 0; i < map->capacity; i++) { + map->entries[i].key = NULL; + map->entries[i].value = NULL; + map->entries[i].next = 0; + map->entries[i].state = PNI_ENTRY_FREE; + } + map->size = 0; +} + +static int pn_map_inspect(void *obj, pn_string_t *dst) +{ + assert(obj); + pn_map_t *map = (pn_map_t *) obj; + int err = pn_string_addf(dst, "{"); + if (err) return err; + pn_handle_t entry = pn_map_head(map); + bool first = true; + while (entry) { + if (first) { + first = false; + } else { + err = pn_string_addf(dst, ", "); + if (err) return err; + } + err = pn_class_inspect(map->key, pn_map_key(map, entry), dst); + if (err) return err; + err = pn_string_addf(dst, ": "); + if (err) return err; + err = pn_class_inspect(map->value, pn_map_value(map, entry), dst); + if (err) return err; + entry = pn_map_next(map, entry); + } + return pn_string_addf(dst, "}"); +} + +#define pn_map_initialize NULL +#define pn_map_compare NULL + +pn_map_t *pn_map(const pn_class_t *key, const pn_class_t *value, + size_t capacity, float load_factor) +{ + static const pn_class_t clazz = PN_CLASS(pn_map); + + pn_map_t *map = (pn_map_t *) pn_class_new(&clazz, sizeof(pn_map_t)); + map->key = key; + map->value = value; + map->capacity = capacity ? capacity : 16; + map->addressable = (size_t) (map->capacity * 0.86); + if (!map->addressable) map->addressable = map->capacity; + map->load_factor = load_factor; + map->hashcode = pn_hashcode; + map->equals = pn_equals; + pni_map_allocate(map); + return map; +} + +size_t pn_map_size(pn_map_t *map) +{ + assert(map); + return map->size; +} + +static float pni_map_load(pn_map_t *map) +{ + return ((float) map->size) / ((float) map->addressable); +} + +static bool pni_map_ensure(pn_map_t *map, size_t capacity) +{ + float load = pni_map_load(map); + if (capacity <= map->capacity && load <= map->load_factor) { + return false; + } + + size_t oldcap = map->capacity; + + while (map->capacity < capacity || pni_map_load(map) > map->load_factor) { + map->capacity *= 2; + map->addressable = (size_t) (0.86 * map->capacity); + } + + pni_entry_t *entries = map->entries; + pni_map_allocate(map); + + for (size_t i = 0; i < oldcap; i++) { + if (entries[i].state != PNI_ENTRY_FREE) { + void *key = entries[i].key; + void *value = entries[i].value; + pn_map_put(map, key, value); + pn_class_decref(map->key, key); + pn_class_decref(map->value, value); + } + } + + free(entries); + return true; +} + +static pni_entry_t *pni_map_entry(pn_map_t *map, void *key, pni_entry_t **pprev, bool create) +{ + uintptr_t hashcode = map->hashcode(key); + + pni_entry_t *entry = &map->entries[hashcode % map->addressable]; + pni_entry_t *prev = NULL; + + if (entry->state == PNI_ENTRY_FREE) { + if (create) { + entry->state = PNI_ENTRY_TAIL; + entry->key = key; + pn_class_incref(map->key, key); + map->size++; + return entry; + } else { + return NULL; + } + } + + while (true) { + if (map->equals(entry->key, key)) { + if (pprev) *pprev = prev; + return entry; + } + + if (entry->state == PNI_ENTRY_TAIL) { + break; + } else { + prev = entry; + entry = &map->entries[entry->next]; + } + } + + if (create) { + if (pni_map_ensure(map, map->size + 1)) { + // if we had to grow the table we need to start over + return pni_map_entry(map, key, pprev, create); + } + + size_t empty = 0; + for (size_t i = 0; i < map->capacity; i++) { + size_t idx = map->capacity - i - 1; + if (map->entries[idx].state == PNI_ENTRY_FREE) { + empty = idx; + break; + } + } + entry->next = empty; + entry->state = PNI_ENTRY_LINK; + map->entries[empty].state = PNI_ENTRY_TAIL; + map->entries[empty].key = key; + pn_class_incref(map->key, key); + if (pprev) *pprev = entry; + map->size++; + return &map->entries[empty]; + } else { + return NULL; + } +} + +int pn_map_put(pn_map_t *map, void *key, void *value) +{ + assert(map); + pni_entry_t *entry = pni_map_entry(map, key, NULL, true); + pn_class_decref(map->value, entry->value); + entry->value = value; + pn_class_incref(map->value, value); + return 0; +} + +void *pn_map_get(pn_map_t *map, void *key) +{ + assert(map); + pni_entry_t *entry = pni_map_entry(map, key, NULL, false); + return entry ? entry->value : NULL; +} + +void pn_map_del(pn_map_t *map, void *key) +{ + assert(map); + pni_entry_t *prev = NULL; + pni_entry_t *entry = pni_map_entry(map, key, &prev, false); + if (entry) { + void *dref_key = entry->key; + void *dref_value = entry->value; + if (prev) { + prev->next = entry->next; + prev->state = entry->state; + } else if (entry->next) { + assert(entry->state == PNI_ENTRY_LINK); + pni_entry_t *next = &map->entries[entry->next]; + *entry = *next; + entry = next; + } + entry->state = PNI_ENTRY_FREE; + entry->next = 0; + entry->key = NULL; + entry->value = NULL; + map->size--; + pn_class_decref(map->key, dref_key); + pn_class_decref(map->value, dref_value); + } +} + +pn_handle_t pn_map_head(pn_map_t *map) +{ + assert(map); + for (size_t i = 0; i < map->capacity; i++) + { + if (map->entries[i].state != PNI_ENTRY_FREE) { + return i + 1; + } + } + + return 0; +} + +pn_handle_t pn_map_next(pn_map_t *map, pn_handle_t entry) +{ + for (size_t i = entry; i < map->capacity; i++) { + if (map->entries[i].state != PNI_ENTRY_FREE) { + return i + 1; + } + } + + return 0; +} + +void *pn_map_key(pn_map_t *map, pn_handle_t entry) +{ + assert(map); + assert(entry); + return map->entries[entry - 1].key; +} + +void *pn_map_value(pn_map_t *map, pn_handle_t entry) +{ + assert(map); + assert(entry); + return map->entries[entry - 1].value; +} + +struct pn_hash_t { + pn_map_t map; +}; + +static uintptr_t pni_identity_hashcode(void *obj) +{ + return (uintptr_t ) obj; +} + +static bool pni_identity_equals(void *a, void *b) +{ + return a == b; +} + +extern const pn_class_t *PN_UINTPTR; + +#define CID_pni_uintptr CID_pn_void +static const pn_class_t *pni_uintptr_reify(void *object) { return PN_UINTPTR; } +#define pni_uintptr_new NULL +#define pni_uintptr_free NULL +#define pni_uintptr_initialize NULL +static void pni_uintptr_incref(void *object) {} +static void pni_uintptr_decref(void *object) {} +static int pni_uintptr_refcount(void *object) { return -1; } +#define pni_uintptr_finalize NULL +#define pni_uintptr_hashcode NULL +#define pni_uintptr_compare NULL +#define pni_uintptr_inspect NULL + +const pn_class_t PNI_UINTPTR = PN_METACLASS(pni_uintptr); +const pn_class_t *PN_UINTPTR = &PNI_UINTPTR; + +pn_hash_t *pn_hash(const pn_class_t *clazz, size_t capacity, float load_factor) +{ + pn_hash_t *hash = (pn_hash_t *) pn_map(PN_UINTPTR, clazz, capacity, load_factor); + hash->map.hashcode = pni_identity_hashcode; + hash->map.equals = pni_identity_equals; + return hash; +} + +size_t pn_hash_size(pn_hash_t *hash) +{ + return pn_map_size(&hash->map); +} + +int pn_hash_put(pn_hash_t *hash, uintptr_t key, void *value) +{ + return pn_map_put(&hash->map, (void *) key, value); +} + +void *pn_hash_get(pn_hash_t *hash, uintptr_t key) +{ + return pn_map_get(&hash->map, (void *) key); +} + +void pn_hash_del(pn_hash_t *hash, uintptr_t key) +{ + pn_map_del(&hash->map, (void *) key); +} + +pn_handle_t pn_hash_head(pn_hash_t *hash) +{ + return pn_map_head(&hash->map); +} + +pn_handle_t pn_hash_next(pn_hash_t *hash, pn_handle_t entry) +{ + return pn_map_next(&hash->map, entry); +} + +uintptr_t pn_hash_key(pn_hash_t *hash, pn_handle_t entry) +{ + return (uintptr_t) pn_map_key(&hash->map, entry); +} + +void *pn_hash_value(pn_hash_t *hash, pn_handle_t entry) +{ + return pn_map_value(&hash->map, entry); +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
