http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/engine/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c deleted file mode 100644 index cb1f479..0000000 --- a/proton-c/src/engine/engine.c +++ /dev/null @@ -1,2231 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "engine-internal.h" -#include <stdlib.h> -#include <string.h> -#include "protocol.h" - -#include <assert.h> -#include <stdarg.h> -#include <stdio.h> - -#include "platform.h" -#include "platform_fmt.h" -#include "transport/transport.h" - - -static void pni_session_bound(pn_session_t *ssn); -static void pni_link_bound(pn_link_t *link); - - -// endpoints - -static pn_connection_t *pni_ep_get_connection(pn_endpoint_t *endpoint) -{ - switch (endpoint->type) { - case CONNECTION: - return (pn_connection_t *) endpoint; - case SESSION: - return ((pn_session_t *) endpoint)->connection; - case SENDER: - case RECEIVER: - return ((pn_link_t *) endpoint)->session->connection; - } - - return NULL; -} - -static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) { - switch (type) { - case CONNECTION: - return open ? PN_CONNECTION_LOCAL_OPEN : PN_CONNECTION_LOCAL_CLOSE; - case SESSION: - return open ? PN_SESSION_LOCAL_OPEN : PN_SESSION_LOCAL_CLOSE; - case SENDER: - case RECEIVER: - return open ? PN_LINK_LOCAL_OPEN : PN_LINK_LOCAL_CLOSE; - default: - assert(false); - return PN_EVENT_NONE; - } -} - -static void pn_endpoint_open(pn_endpoint_t *endpoint) -{ - if (!(endpoint->state & PN_LOCAL_ACTIVE)) { - PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE); - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_collector_put(conn->collector, PN_OBJECT, endpoint, - endpoint_event(endpoint->type, true)); - pn_modified(conn, endpoint, true); - } -} - -static void pn_endpoint_close(pn_endpoint_t *endpoint) -{ - if (!(endpoint->state & PN_LOCAL_CLOSED)) { - PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED); - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_collector_put(conn->collector, PN_OBJECT, endpoint, - endpoint_event(endpoint->type, false)); - pn_modified(conn, endpoint, true); - } -} - -void pn_connection_reset(pn_connection_t *connection) -{ - assert(connection); - pn_endpoint_t *endpoint = &connection->endpoint; - endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; -} - -void pn_connection_open(pn_connection_t *connection) -{ - assert(connection); - pn_endpoint_open(&connection->endpoint); -} - -void pn_connection_close(pn_connection_t *connection) -{ - assert(connection); - pn_endpoint_close(&connection->endpoint); -} - -static void pni_endpoint_tini(pn_endpoint_t *endpoint); - -void pn_connection_release(pn_connection_t *connection) -{ - assert(!connection->endpoint.freed); - // free those endpoints that haven't been freed by the application - LL_REMOVE(connection, endpoint, &connection->endpoint); - while (connection->endpoint_head) { - pn_endpoint_t *ep = connection->endpoint_head; - switch (ep->type) { - case SESSION: - // note: this will free all child links: - pn_session_free((pn_session_t *)ep); - break; - case SENDER: - case RECEIVER: - pn_link_free((pn_link_t *)ep); - break; - default: - assert(false); - } - } - connection->endpoint.freed = true; - if (!connection->transport) { - // no transport available to consume transport work items, - // so manually clear them: - pn_ep_incref(&connection->endpoint); - pn_connection_unbound(connection); - } - pn_ep_decref(&connection->endpoint); -} - -void pn_connection_free(pn_connection_t *connection) { - pn_connection_release(connection); - pn_decref(connection); -} - -void pn_connection_bound(pn_connection_t *connection) -{ - pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND); - pn_ep_incref(&connection->endpoint); - - size_t nsessions = pn_list_size(connection->sessions); - for (size_t i = 0; i < nsessions; i++) { - pni_session_bound((pn_session_t *) pn_list_get(connection->sessions, i)); - } -} - -// invoked when transport has been removed: -void pn_connection_unbound(pn_connection_t *connection) -{ - connection->transport = NULL; - if (connection->endpoint.freed) { - // connection has been freed prior to unbinding, thus it - // cannot be re-assigned to a new transport. Clear the - // transport work lists to allow the connection to be freed. - while (connection->transport_head) { - pn_clear_modified(connection, connection->transport_head); - } - while (connection->tpwork_head) { - pn_clear_tpwork(connection->tpwork_head); - } - } - pn_ep_decref(&connection->endpoint); -} - -pn_record_t *pn_connection_attachments(pn_connection_t *connection) -{ - assert(connection); - return connection->context; -} - -void *pn_connection_get_context(pn_connection_t *conn) -{ - // XXX: we should really assert on conn here, but this causes - // messenger tests to fail - return conn ? pn_record_get(conn->context, PN_LEGCTX) : NULL; -} - -void pn_connection_set_context(pn_connection_t *conn, void *context) -{ - assert(conn); - pn_record_set(conn->context, PN_LEGCTX, context); -} - -pn_transport_t *pn_connection_transport(pn_connection_t *connection) -{ - assert(connection); - return connection->transport; -} - -void pn_condition_init(pn_condition_t *condition) -{ - condition->name = pn_string(NULL); - condition->description = pn_string(NULL); - condition->info = pn_data(0); -} - -void pn_condition_tini(pn_condition_t *condition) -{ - pn_data_free(condition->info); - pn_free(condition->description); - pn_free(condition->name); -} - -static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn) -{ - pn_list_add(conn->sessions, ssn); - ssn->connection = conn; - pn_incref(conn); // keep around until finalized - pn_ep_incref(&conn->endpoint); -} - -static void pni_remove_session(pn_connection_t *conn, pn_session_t *ssn) -{ - if (pn_list_remove(conn->sessions, ssn)) { - pn_ep_decref(&conn->endpoint); - LL_REMOVE(conn, endpoint, &ssn->endpoint); - } -} - -pn_connection_t *pn_session_connection(pn_session_t *session) -{ - if (!session) return NULL; - return session->connection; -} - -void pn_session_open(pn_session_t *session) -{ - assert(session); - pn_endpoint_open(&session->endpoint); -} - -void pn_session_close(pn_session_t *session) -{ - assert(session); - pn_endpoint_close(&session->endpoint); -} - -void pn_session_free(pn_session_t *session) -{ - assert(!session->endpoint.freed); - while(pn_list_size(session->links)) { - pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0); - pn_link_free(link); - } - pni_remove_session(session->connection, session); - pn_list_add(session->connection->freed, session); - session->endpoint.freed = true; - pn_ep_decref(&session->endpoint); - - // the finalize logic depends on endpoint.freed, so we incref/decref - // to give it a chance to rerun - pn_incref(session); - pn_decref(session); -} - -pn_record_t *pn_session_attachments(pn_session_t *session) -{ - assert(session); - return session->context; -} - -void *pn_session_get_context(pn_session_t *session) -{ - return session ? pn_record_get(session->context, PN_LEGCTX) : 0; -} - -void pn_session_set_context(pn_session_t *session, void *context) -{ - assert(context); - pn_record_set(session->context, PN_LEGCTX, context); -} - - -static void pni_add_link(pn_session_t *ssn, pn_link_t *link) -{ - pn_list_add(ssn->links, link); - link->session = ssn; - pn_ep_incref(&ssn->endpoint); -} - -static void pni_remove_link(pn_session_t *ssn, pn_link_t *link) -{ - if (pn_list_remove(ssn->links, link)) { - pn_ep_decref(&ssn->endpoint); - LL_REMOVE(ssn->connection, endpoint, &link->endpoint); - } -} - -void pn_link_open(pn_link_t *link) -{ - assert(link); - pn_endpoint_open(&link->endpoint); -} - -void pn_link_close(pn_link_t *link) -{ - assert(link); - pn_endpoint_close(&link->endpoint); -} - -void pn_link_detach(pn_link_t *link) -{ - assert(link); - if (link->detached) return; - - link->detached = true; - pn_collector_put(link->session->connection->collector, PN_OBJECT, link, PN_LINK_LOCAL_DETACH); - pn_modified(link->session->connection, &link->endpoint, true); - -} - -static void pni_terminus_free(pn_terminus_t *terminus) -{ - pn_free(terminus->address); - pn_free(terminus->properties); - pn_free(terminus->capabilities); - pn_free(terminus->outcomes); - pn_free(terminus->filter); -} - -void pn_link_free(pn_link_t *link) -{ - assert(!link->endpoint.freed); - pni_remove_link(link->session, link); - pn_list_add(link->session->freed, link); - pn_delivery_t *delivery = link->unsettled_head; - while (delivery) { - pn_delivery_t *next = delivery->unsettled_next; - pn_delivery_settle(delivery); - delivery = next; - } - link->endpoint.freed = true; - pn_ep_decref(&link->endpoint); - - // the finalize logic depends on endpoint.freed (modified above), so - // we incref/decref to give it a chance to rerun - pn_incref(link); - pn_decref(link); -} - -void *pn_link_get_context(pn_link_t *link) -{ - assert(link); - return pn_record_get(link->context, PN_LEGCTX); -} - -void pn_link_set_context(pn_link_t *link, void *context) -{ - assert(link); - pn_record_set(link->context, PN_LEGCTX, context); -} - -pn_record_t *pn_link_attachments(pn_link_t *link) -{ - assert(link); - return link->context; -} - -void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn) -{ - endpoint->type = (pn_endpoint_type_t) type; - endpoint->referenced = true; - endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; - endpoint->error = pn_error(); - pn_condition_init(&endpoint->condition); - pn_condition_init(&endpoint->remote_condition); - endpoint->endpoint_next = NULL; - endpoint->endpoint_prev = NULL; - endpoint->transport_next = NULL; - endpoint->transport_prev = NULL; - endpoint->modified = false; - endpoint->freed = false; - endpoint->refcount = 1; - //fprintf(stderr, "initting 0x%lx\n", (uintptr_t) endpoint); - - LL_ADD(conn, endpoint, endpoint); -} - -void pn_ep_incref(pn_endpoint_t *endpoint) -{ - endpoint->refcount++; -} - -static pn_event_type_t pn_final_type(pn_endpoint_type_t type) { - switch (type) { - case CONNECTION: - return PN_CONNECTION_FINAL; - case SESSION: - return PN_SESSION_FINAL; - case SENDER: - case RECEIVER: - return PN_LINK_FINAL; - default: - assert(false); - return PN_EVENT_NONE; - } -} - -static pn_endpoint_t *pn_ep_parent(pn_endpoint_t *endpoint) { - switch (endpoint->type) { - case CONNECTION: - return NULL; - case SESSION: - return &((pn_session_t *) endpoint)->connection->endpoint; - case SENDER: - case RECEIVER: - return &((pn_link_t *) endpoint)->session->endpoint; - default: - assert(false); - return NULL; - } -} - -void pn_ep_decref(pn_endpoint_t *endpoint) -{ - assert(endpoint->refcount > 0); - endpoint->refcount--; - if (endpoint->refcount == 0) { - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_collector_put(conn->collector, PN_OBJECT, endpoint, pn_final_type(endpoint->type)); - } -} - -static void pni_endpoint_tini(pn_endpoint_t *endpoint) -{ - pn_error_free(endpoint->error); - pn_condition_tini(&endpoint->remote_condition); - pn_condition_tini(&endpoint->condition); -} - -static void pni_free_children(pn_list_t *children, pn_list_t *freed) -{ - while (pn_list_size(children) > 0) { - pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(children, 0); - assert(!endpoint->referenced); - pn_free(endpoint); - } - - while (pn_list_size(freed) > 0) { - pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(freed, 0); - assert(!endpoint->referenced); - pn_free(endpoint); - } - - pn_free(children); - pn_free(freed); -} - -static void pn_connection_finalize(void *object) -{ - pn_connection_t *conn = (pn_connection_t *) object; - pn_endpoint_t *endpoint = &conn->endpoint; - - if (conn->transport) { - assert(!conn->transport->referenced); - pn_free(conn->transport); - } - - // freeing the transport could post events - if (pn_refcount(conn) > 0) { - return; - } - - pni_free_children(conn->sessions, conn->freed); - pn_free(conn->context); - pn_decref(conn->collector); - - pn_free(conn->container); - pn_free(conn->hostname); - pn_free(conn->auth_user); - pn_free(conn->auth_password); - pn_free(conn->offered_capabilities); - pn_free(conn->desired_capabilities); - pn_free(conn->properties); - pni_endpoint_tini(endpoint); - pn_free(conn->delivery_pool); -} - -#define pn_connection_initialize NULL -#define pn_connection_hashcode NULL -#define pn_connection_compare NULL -#define pn_connection_inspect NULL - -pn_connection_t *pn_connection(void) -{ - static const pn_class_t clazz = PN_CLASS(pn_connection); - pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t)); - if (!conn) return NULL; - - conn->endpoint_head = NULL; - conn->endpoint_tail = NULL; - pn_endpoint_init(&conn->endpoint, CONNECTION, conn); - conn->transport_head = NULL; - conn->transport_tail = NULL; - conn->sessions = pn_list(PN_WEAKREF, 0); - conn->freed = pn_list(PN_WEAKREF, 0); - conn->transport = NULL; - conn->work_head = NULL; - conn->work_tail = NULL; - conn->tpwork_head = NULL; - conn->tpwork_tail = NULL; - conn->container = pn_string(NULL); - conn->hostname = pn_string(NULL); - conn->auth_user = pn_string(NULL); - conn->auth_password = pn_string(NULL); - conn->offered_capabilities = pn_data(0); - conn->desired_capabilities = pn_data(0); - conn->properties = pn_data(0); - conn->collector = NULL; - conn->context = pn_record(); - conn->delivery_pool = pn_list(PN_OBJECT, 0); - - return conn; -} - -static const pn_event_type_t endpoint_init_event_map[] = { - PN_CONNECTION_INIT, /* CONNECTION */ - PN_SESSION_INIT, /* SESSION */ - PN_LINK_INIT, /* SENDER */ - PN_LINK_INIT}; /* RECEIVER */ - -void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector) -{ - pn_decref(connection->collector); - connection->collector = collector; - pn_incref(connection->collector); - pn_endpoint_t *endpoint = connection->endpoint_head; - while (endpoint) { - pn_collector_put(connection->collector, PN_OBJECT, endpoint, endpoint_init_event_map[endpoint->type]); - endpoint = endpoint->endpoint_next; - } -} - -pn_state_t pn_connection_state(pn_connection_t *connection) -{ - return connection ? connection->endpoint.state : 0; -} - -pn_error_t *pn_connection_error(pn_connection_t *connection) -{ - return connection ? connection->endpoint.error : NULL; -} - -const char *pn_connection_get_container(pn_connection_t *connection) -{ - assert(connection); - return pn_string_get(connection->container); -} - -void pn_connection_set_container(pn_connection_t *connection, const char *container) -{ - assert(connection); - pn_string_set(connection->container, container); -} - -const char *pn_connection_get_hostname(pn_connection_t *connection) -{ - assert(connection); - return pn_string_get(connection->hostname); -} - -void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname) -{ - assert(connection); - pn_string_set(connection->hostname, hostname); -} - -const char *pn_connection_get_user(pn_connection_t *connection) -{ - assert(connection); - return pn_string_get(connection->auth_user); -} - -void pn_connection_set_user(pn_connection_t *connection, const char *user) -{ - assert(connection); - pn_string_set(connection->auth_user, user); -} - -void pn_connection_set_password(pn_connection_t *connection, const char *password) -{ - assert(connection); - // Make sure the previous password is erased, if there was one. - size_t n = pn_string_size(connection->auth_password); - const char* s = pn_string_get(connection->auth_password); - if (n > 0 && s) memset((void*)s, 0, n); - pn_string_set(connection->auth_password, password); -} - -pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection) -{ - assert(connection); - return connection->offered_capabilities; -} - -pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection) -{ - assert(connection); - return connection->desired_capabilities; -} - -pn_data_t *pn_connection_properties(pn_connection_t *connection) -{ - assert(connection); - return connection->properties; -} - -pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_offered_capabilities : NULL; -} - -pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_desired_capabilities : NULL; -} - -pn_data_t *pn_connection_remote_properties(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_properties : NULL; -} - -const char *pn_connection_remote_container(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_container : NULL; -} - -const char *pn_connection_remote_hostname(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_hostname : NULL; -} - -pn_delivery_t *pn_work_head(pn_connection_t *connection) -{ - assert(connection); - return connection->work_head; -} - -pn_delivery_t *pn_work_next(pn_delivery_t *delivery) -{ - assert(delivery); - - if (delivery->work) - return delivery->work_next; - else - return pn_work_head(delivery->link->session->connection); -} - -static void pni_add_work(pn_connection_t *connection, pn_delivery_t *delivery) -{ - if (!delivery->work) - { - assert(!delivery->local.settled); // never allow settled deliveries - LL_ADD(connection, work, delivery); - delivery->work = true; - } -} - -static void pni_clear_work(pn_connection_t *connection, pn_delivery_t *delivery) -{ - if (delivery->work) - { - LL_REMOVE(connection, work, delivery); - delivery->work = false; - } -} - -void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery) -{ - pn_link_t *link = pn_delivery_link(delivery); - pn_delivery_t *current = pn_link_current(link); - if (delivery->updated && !delivery->local.settled) { - pni_add_work(connection, delivery); - } else if (delivery == current) { - if (link->endpoint.type == SENDER) { - if (pn_link_credit(link) > 0) { - pni_add_work(connection, delivery); - } else { - pni_clear_work(connection, delivery); - } - } else { - pni_add_work(connection, delivery); - } - } else { - pni_clear_work(connection, delivery); - } -} - -static void pni_add_tpwork(pn_delivery_t *delivery) -{ - pn_connection_t *connection = delivery->link->session->connection; - if (!delivery->tpwork) - { - LL_ADD(connection, tpwork, delivery); - delivery->tpwork = true; - } - pn_modified(connection, &connection->endpoint, true); -} - -void pn_clear_tpwork(pn_delivery_t *delivery) -{ - pn_connection_t *connection = delivery->link->session->connection; - if (delivery->tpwork) - { - LL_REMOVE(connection, tpwork, delivery); - delivery->tpwork = false; - if (pn_refcount(delivery) > 0) { - pn_incref(delivery); - pn_decref(delivery); - } - } -} - -void pn_dump(pn_connection_t *conn) -{ - pn_endpoint_t *endpoint = conn->transport_head; - while (endpoint) - { - printf("%p", (void *) endpoint); - endpoint = endpoint->transport_next; - if (endpoint) - printf(" -> "); - } - printf("\n"); -} - -void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit) -{ - if (!endpoint->modified) { - LL_ADD(connection, transport, endpoint); - endpoint->modified = true; - } - - if (emit && connection->transport) { - pn_collector_put(connection->collector, PN_OBJECT, connection->transport, - PN_TRANSPORT); - } -} - -void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint) -{ - if (endpoint->modified) { - LL_REMOVE(connection, transport, endpoint); - endpoint->transport_next = NULL; - endpoint->transport_prev = NULL; - endpoint->modified = false; - } -} - -static bool pni_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) -{ - if (endpoint->type != type) return false; - - if (!state) return true; - - int st = endpoint->state; - if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0) - return st & state; - else - return st == state; -} - -pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) -{ - while (endpoint) - { - if (pni_matches(endpoint, type, state)) - return endpoint; - endpoint = endpoint->endpoint_next; - } - return NULL; -} - -pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state) -{ - if (conn) - return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state); - else - return NULL; -} - -pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state) -{ - if (ssn) - return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state); - else - return NULL; -} - -pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state) -{ - if (!conn) return NULL; - - pn_endpoint_t *endpoint = conn->endpoint_head; - - while (endpoint) - { - if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state)) - return (pn_link_t *) endpoint; - endpoint = endpoint->endpoint_next; - } - - return NULL; -} - -pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state) -{ - if (!link) return NULL; - - pn_endpoint_t *endpoint = link->endpoint.endpoint_next; - - while (endpoint) - { - if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state)) - return (pn_link_t *) endpoint; - endpoint = endpoint->endpoint_next; - } - - return NULL; -} - -static void pn_session_incref(void *object) -{ - pn_session_t *session = (pn_session_t *) object; - if (!session->endpoint.referenced) { - session->endpoint.referenced = true; - pn_incref(session->connection); - } else { - pn_object_incref(object); - } -} - -static bool pn_ep_bound(pn_endpoint_t *endpoint) -{ - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_session_t *ssn; - pn_link_t *lnk; - - if (!conn->transport) return false; - if (endpoint->modified) return true; - - switch (endpoint->type) { - case CONNECTION: - return ((pn_connection_t *)endpoint)->transport; - case SESSION: - ssn = (pn_session_t *) endpoint; - return (((int16_t) ssn->state.local_channel) >= 0 || ((int16_t) ssn->state.remote_channel) >= 0); - case SENDER: - case RECEIVER: - lnk = (pn_link_t *) endpoint; - return ((int32_t) lnk->state.local_handle) >= 0 || ((int32_t) lnk->state.remote_handle) >= 0; - default: - assert(false); - return false; - } -} - -static bool pni_connection_live(pn_connection_t *conn) { - return pn_refcount(conn) > 1; -} - -static bool pni_session_live(pn_session_t *ssn) { - return pni_connection_live(ssn->connection) || pn_refcount(ssn) > 1; -} - -static bool pni_link_live(pn_link_t *link) { - return pni_session_live(link->session) || pn_refcount(link) > 1; -} - -static bool pni_endpoint_live(pn_endpoint_t *endpoint) { - switch (endpoint->type) { - case CONNECTION: - return pni_connection_live((pn_connection_t *)endpoint); - case SESSION: - return pni_session_live((pn_session_t *) endpoint); - case SENDER: - case RECEIVER: - return pni_link_live((pn_link_t *) endpoint); - default: - assert(false); - return false; - } -} - -static bool pni_preserve_child(pn_endpoint_t *endpoint) -{ - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_endpoint_t *parent = pn_ep_parent(endpoint); - if (pni_endpoint_live(parent) && (!endpoint->freed || (pn_ep_bound(endpoint))) - && endpoint->referenced) { - pn_object_incref(endpoint); - endpoint->referenced = false; - pn_decref(parent); - return true; - } else { - LL_REMOVE(conn, transport, endpoint); - return false; - } -} - -static void pn_session_finalize(void *object) -{ - pn_session_t *session = (pn_session_t *) object; - pn_endpoint_t *endpoint = &session->endpoint; - - if (pni_preserve_child(endpoint)) { - return; - } - - pn_free(session->context); - pni_free_children(session->links, session->freed); - pni_endpoint_tini(endpoint); - pn_delivery_map_free(&session->state.incoming); - pn_delivery_map_free(&session->state.outgoing); - pn_free(session->state.local_handles); - pn_free(session->state.remote_handles); - pni_remove_session(session->connection, session); - pn_list_remove(session->connection->freed, session); - - if (session->connection->transport) { - pn_transport_t *transport = session->connection->transport; - pn_hash_del(transport->local_channels, session->state.local_channel); - pn_hash_del(transport->remote_channels, session->state.remote_channel); - } - - if (endpoint->referenced) { - pn_decref(session->connection); - } -} - -#define pn_session_new pn_object_new -#define pn_session_refcount pn_object_refcount -#define pn_session_decref pn_object_decref -#define pn_session_reify pn_object_reify -#define pn_session_initialize NULL -#define pn_session_hashcode NULL -#define pn_session_compare NULL -#define pn_session_inspect NULL - -pn_session_t *pn_session(pn_connection_t *conn) -{ - assert(conn); - - - pn_transport_t * transport = pn_connection_transport(conn); - - if(transport) { - // channel_max is an index, not a count. - if(pn_hash_size(transport->local_channels) > (size_t)transport->channel_max) { - pn_transport_logf(transport, - "pn_session: too many sessions: %d channel_max is %d", - pn_hash_size(transport->local_channels), - transport->channel_max); - return (pn_session_t *) 0; - } - } - -#define pn_session_free pn_object_free - static const pn_class_t clazz = PN_METACLASS(pn_session); -#undef pn_session_free - pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t)); - if (!ssn) return NULL; - pn_endpoint_init(&ssn->endpoint, SESSION, conn); - pni_add_session(conn, ssn); - ssn->links = pn_list(PN_WEAKREF, 0); - ssn->freed = pn_list(PN_WEAKREF, 0); - ssn->context = pn_record(); - ssn->incoming_capacity = 1024*1024; - ssn->incoming_bytes = 0; - ssn->outgoing_bytes = 0; - ssn->incoming_deliveries = 0; - ssn->outgoing_deliveries = 0; - ssn->outgoing_window = 2147483647; - - // begin transport state - memset(&ssn->state, 0, sizeof(ssn->state)); - ssn->state.local_channel = (uint16_t)-1; - ssn->state.remote_channel = (uint16_t)-1; - pn_delivery_map_init(&ssn->state.incoming, 0); - pn_delivery_map_init(&ssn->state.outgoing, 0); - ssn->state.local_handles = pn_hash(PN_WEAKREF, 0, 0.75); - ssn->state.remote_handles = pn_hash(PN_WEAKREF, 0, 0.75); - // end transport state - - pn_collector_put(conn->collector, PN_OBJECT, ssn, PN_SESSION_INIT); - if (conn->transport) { - pni_session_bound(ssn); - } - pn_decref(ssn); - return ssn; -} - -static void pni_session_bound(pn_session_t *ssn) -{ - assert(ssn); - size_t nlinks = pn_list_size(ssn->links); - for (size_t i = 0; i < nlinks; i++) { - pni_link_bound((pn_link_t *) pn_list_get(ssn->links, i)); - } -} - -void pn_session_unbound(pn_session_t* ssn) -{ - assert(ssn); - ssn->state.local_channel = (uint16_t)-1; - ssn->state.remote_channel = (uint16_t)-1; - ssn->incoming_bytes = 0; - ssn->outgoing_bytes = 0; - ssn->incoming_deliveries = 0; - ssn->outgoing_deliveries = 0; -} - -size_t pn_session_get_incoming_capacity(pn_session_t *ssn) -{ - assert(ssn); - return ssn->incoming_capacity; -} - -void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity) -{ - assert(ssn); - // XXX: should this trigger a flow? - ssn->incoming_capacity = capacity; -} - -size_t pn_session_get_outgoing_window(pn_session_t *ssn) -{ - assert(ssn); - return ssn->outgoing_window; -} - -void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window) -{ - assert(ssn); - ssn->outgoing_window = window; -} - -size_t pn_session_outgoing_bytes(pn_session_t *ssn) -{ - assert(ssn); - return ssn->outgoing_bytes; -} - -size_t pn_session_incoming_bytes(pn_session_t *ssn) -{ - assert(ssn); - return ssn->incoming_bytes; -} - -pn_state_t pn_session_state(pn_session_t *session) -{ - return session->endpoint.state; -} - -pn_error_t *pn_session_error(pn_session_t *session) -{ - return session->endpoint.error; -} - -static void pni_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type) -{ - terminus->type = type; - terminus->address = pn_string(NULL); - terminus->durability = PN_NONDURABLE; - terminus->expiry_policy = PN_EXPIRE_WITH_SESSION; - terminus->timeout = 0; - terminus->dynamic = false; - terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED; - terminus->properties = pn_data(0); - terminus->capabilities = pn_data(0); - terminus->outcomes = pn_data(0); - terminus->filter = pn_data(0); -} - -static void pn_link_incref(void *object) -{ - pn_link_t *link = (pn_link_t *) object; - if (!link->endpoint.referenced) { - link->endpoint.referenced = true; - pn_incref(link->session); - } else { - pn_object_incref(object); - } -} - -static void pn_link_finalize(void *object) -{ - pn_link_t *link = (pn_link_t *) object; - pn_endpoint_t *endpoint = &link->endpoint; - - if (pni_preserve_child(endpoint)) { - return; - } - - while (link->unsettled_head) { - assert(!link->unsettled_head->referenced); - pn_free(link->unsettled_head); - } - - pn_free(link->context); - pni_terminus_free(&link->source); - pni_terminus_free(&link->target); - pni_terminus_free(&link->remote_source); - pni_terminus_free(&link->remote_target); - pn_free(link->name); - pni_endpoint_tini(endpoint); - pni_remove_link(link->session, link); - pn_hash_del(link->session->state.local_handles, link->state.local_handle); - pn_hash_del(link->session->state.remote_handles, link->state.remote_handle); - pn_list_remove(link->session->freed, link); - if (endpoint->referenced) { - pn_decref(link->session); - } -} - -#define pn_link_refcount pn_object_refcount -#define pn_link_decref pn_object_decref -#define pn_link_reify pn_object_reify -#define pn_link_initialize NULL -#define pn_link_hashcode NULL -#define pn_link_compare NULL -#define pn_link_inspect NULL - -pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) -{ -#define pn_link_new pn_object_new -#define pn_link_free pn_object_free - static const pn_class_t clazz = PN_METACLASS(pn_link); -#undef pn_link_new -#undef pn_link_free - pn_link_t *link = (pn_link_t *) pn_class_new(&clazz, sizeof(pn_link_t)); - - pn_endpoint_init(&link->endpoint, type, session->connection); - pni_add_link(session, link); - pn_incref(session); // keep session until link finalized - link->name = pn_string(name); - pni_terminus_init(&link->source, PN_SOURCE); - pni_terminus_init(&link->target, PN_TARGET); - pni_terminus_init(&link->remote_source, PN_UNSPECIFIED); - pni_terminus_init(&link->remote_target, PN_UNSPECIFIED); - link->unsettled_head = link->unsettled_tail = link->current = NULL; - link->unsettled_count = 0; - link->available = 0; - link->credit = 0; - link->queued = 0; - link->drain = false; - link->drain_flag_mode = true; - link->drained = 0; - link->context = pn_record(); - link->snd_settle_mode = PN_SND_MIXED; - link->rcv_settle_mode = PN_RCV_FIRST; - link->remote_snd_settle_mode = PN_SND_MIXED; - link->remote_rcv_settle_mode = PN_RCV_FIRST; - link->detached = false; - - // begin transport state - link->state.local_handle = -1; - link->state.remote_handle = -1; - link->state.delivery_count = 0; - link->state.link_credit = 0; - // end transport state - - pn_collector_put(session->connection->collector, PN_OBJECT, link, PN_LINK_INIT); - if (session->connection->transport) { - pni_link_bound(link); - } - pn_decref(link); - return link; -} - -static void pni_link_bound(pn_link_t *link) -{ -} - -void pn_link_unbound(pn_link_t* link) -{ - assert(link); - link->state.local_handle = -1; - link->state.remote_handle = -1; - link->state.delivery_count = 0; - link->state.link_credit = 0; -} - -pn_terminus_t *pn_link_source(pn_link_t *link) -{ - return link ? &link->source : NULL; -} - -pn_terminus_t *pn_link_target(pn_link_t *link) -{ - return link ? &link->target : NULL; -} - -pn_terminus_t *pn_link_remote_source(pn_link_t *link) -{ - return link ? &link->remote_source : NULL; -} - -pn_terminus_t *pn_link_remote_target(pn_link_t *link) -{ - return link ? &link->remote_target : NULL; -} - -int pn_terminus_set_type(pn_terminus_t *terminus, pn_terminus_type_t type) -{ - if (!terminus) return PN_ARG_ERR; - terminus->type = type; - return 0; -} - -pn_terminus_type_t pn_terminus_get_type(pn_terminus_t *terminus) -{ - return terminus ? terminus->type : (pn_terminus_type_t) 0; -} - -const char *pn_terminus_get_address(pn_terminus_t *terminus) -{ - assert(terminus); - return pn_string_get(terminus->address); -} - -int pn_terminus_set_address(pn_terminus_t *terminus, const char *address) -{ - assert(terminus); - return pn_string_set(terminus->address, address); -} - -pn_durability_t pn_terminus_get_durability(pn_terminus_t *terminus) -{ - return terminus ? terminus->durability : (pn_durability_t) 0; -} - -int pn_terminus_set_durability(pn_terminus_t *terminus, pn_durability_t durability) -{ - if (!terminus) return PN_ARG_ERR; - terminus->durability = durability; - return 0; -} - -pn_expiry_policy_t pn_terminus_get_expiry_policy(pn_terminus_t *terminus) -{ - return terminus ? terminus->expiry_policy : (pn_expiry_policy_t) 0; -} - -int pn_terminus_set_expiry_policy(pn_terminus_t *terminus, pn_expiry_policy_t expiry_policy) -{ - if (!terminus) return PN_ARG_ERR; - terminus->expiry_policy = expiry_policy; - return 0; -} - -pn_seconds_t pn_terminus_get_timeout(pn_terminus_t *terminus) -{ - return terminus ? terminus->timeout : 0; -} - -int pn_terminus_set_timeout(pn_terminus_t *terminus, pn_seconds_t timeout) -{ - if (!terminus) return PN_ARG_ERR; - terminus->timeout = timeout; - return 0; -} - -bool pn_terminus_is_dynamic(pn_terminus_t *terminus) -{ - return terminus ? terminus->dynamic : false; -} - -int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic) -{ - if (!terminus) return PN_ARG_ERR; - terminus->dynamic = dynamic; - return 0; -} - -pn_data_t *pn_terminus_properties(pn_terminus_t *terminus) -{ - return terminus ? terminus->properties : NULL; -} - -pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus) -{ - return terminus ? terminus->capabilities : NULL; -} - -pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus) -{ - return terminus ? terminus->outcomes : NULL; -} - -pn_data_t *pn_terminus_filter(pn_terminus_t *terminus) -{ - return terminus ? terminus->filter : NULL; -} - -pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus) -{ - return terminus ? terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED; -} - -int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m) -{ - if (!terminus) return PN_ARG_ERR; - terminus->distribution_mode = m; - return 0; -} - -int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src) -{ - if (!terminus || !src) { - return PN_ARG_ERR; - } - - terminus->type = src->type; - int err = pn_terminus_set_address(terminus, pn_terminus_get_address(src)); - if (err) return err; - terminus->durability = src->durability; - terminus->expiry_policy = src->expiry_policy; - terminus->timeout = src->timeout; - terminus->dynamic = src->dynamic; - terminus->distribution_mode = src->distribution_mode; - err = pn_data_copy(terminus->properties, src->properties); - if (err) return err; - err = pn_data_copy(terminus->capabilities, src->capabilities); - if (err) return err; - err = pn_data_copy(terminus->outcomes, src->outcomes); - if (err) return err; - err = pn_data_copy(terminus->filter, src->filter); - if (err) return err; - return 0; -} - -pn_link_t *pn_sender(pn_session_t *session, const char *name) -{ - return pn_link_new(SENDER, session, name); -} - -pn_link_t *pn_receiver(pn_session_t *session, const char *name) -{ - return pn_link_new(RECEIVER, session, name); -} - -pn_state_t pn_link_state(pn_link_t *link) -{ - return link->endpoint.state; -} - -pn_error_t *pn_link_error(pn_link_t *link) -{ - return link->endpoint.error; -} - -const char *pn_link_name(pn_link_t *link) -{ - assert(link); - return pn_string_get(link->name); -} - -bool pn_link_is_sender(pn_link_t *link) -{ - return link->endpoint.type == SENDER; -} - -bool pn_link_is_receiver(pn_link_t *link) -{ - return link->endpoint.type == RECEIVER; -} - -pn_session_t *pn_link_session(pn_link_t *link) -{ - assert(link); - return link->session; -} - -static void pn_disposition_finalize(pn_disposition_t *ds) -{ - pn_free(ds->data); - pn_free(ds->annotations); - pn_condition_tini(&ds->condition); -} - -static void pn_delivery_incref(void *object) -{ - pn_delivery_t *delivery = (pn_delivery_t *) object; - if (delivery->link && !delivery->referenced) { - delivery->referenced = true; - pn_incref(delivery->link); - } else { - pn_object_incref(object); - } -} - -static bool pni_preserve_delivery(pn_delivery_t *delivery) -{ - pn_connection_t *conn = delivery->link->session->connection; - return !delivery->local.settled || (conn->transport && (delivery->state.init || delivery->tpwork)); -} - -static void pn_delivery_finalize(void *object) -{ - pn_delivery_t *delivery = (pn_delivery_t *) object; - pn_link_t *link = delivery->link; - // assert(!delivery->state.init); - - bool pooled = false; - bool referenced = true; - if (link) { - if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) { - delivery->referenced = false; - pn_object_incref(delivery); - pn_decref(link); - return; - } - referenced = delivery->referenced; - - pn_clear_tpwork(delivery); - LL_REMOVE(link, unsettled, delivery); - pn_delivery_map_del(pn_link_is_sender(link) - ? &link->session->state.outgoing - : &link->session->state.incoming, - delivery); - pn_buffer_clear(delivery->tag); - pn_buffer_clear(delivery->bytes); - pn_record_clear(delivery->context); - delivery->settled = true; - pn_connection_t *conn = link->session->connection; - assert(pn_refcount(delivery) == 0); - if (pni_connection_live(conn)) { - pn_list_t *pool = link->session->connection->delivery_pool; - delivery->link = NULL; - pn_list_add(pool, delivery); - pooled = true; - assert(pn_refcount(delivery) == 1); - } - } - - if (!pooled) { - pn_free(delivery->context); - pn_buffer_free(delivery->tag); - pn_buffer_free(delivery->bytes); - pn_disposition_finalize(&delivery->local); - pn_disposition_finalize(&delivery->remote); - } - - if (referenced) { - pn_decref(link); - } -} - -static void pn_disposition_init(pn_disposition_t *ds) -{ - ds->data = pn_data(0); - ds->annotations = pn_data(0); - pn_condition_init(&ds->condition); -} - -static void pn_disposition_clear(pn_disposition_t *ds) -{ - ds->type = 0; - ds->section_number = 0; - ds->section_offset = 0; - ds->failed = false; - ds->undeliverable = false; - ds->settled = false; - pn_data_clear(ds->data); - pn_data_clear(ds->annotations); - pn_condition_clear(&ds->condition); -} - -#define pn_delivery_new pn_object_new -#define pn_delivery_refcount pn_object_refcount -#define pn_delivery_decref pn_object_decref -#define pn_delivery_free pn_object_free -#define pn_delivery_reify pn_object_reify -#define pn_delivery_initialize NULL -#define pn_delivery_hashcode NULL -#define pn_delivery_compare NULL -#define pn_delivery_inspect NULL - -pn_delivery_tag_t pn_dtag(const char *bytes, size_t size) { - pn_delivery_tag_t dtag = {size, bytes}; - return dtag; -} - -pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) -{ - assert(link); - pn_list_t *pool = link->session->connection->delivery_pool; - pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool); - if (!delivery) { - static const pn_class_t clazz = PN_METACLASS(pn_delivery); - delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t)); - if (!delivery) return NULL; - delivery->tag = pn_buffer(16); - delivery->bytes = pn_buffer(64); - pn_disposition_init(&delivery->local); - pn_disposition_init(&delivery->remote); - delivery->context = pn_record(); - } else { - assert(!delivery->state.init); - } - delivery->link = link; - pn_incref(delivery->link); // keep link until finalized - pn_buffer_clear(delivery->tag); - pn_buffer_append(delivery->tag, tag.start, tag.size); - pn_disposition_clear(&delivery->local); - pn_disposition_clear(&delivery->remote); - delivery->updated = false; - delivery->settled = false; - LL_ADD(link, unsettled, delivery); - delivery->referenced = true; - delivery->work_next = NULL; - delivery->work_prev = NULL; - delivery->work = false; - delivery->tpwork_next = NULL; - delivery->tpwork_prev = NULL; - delivery->tpwork = false; - pn_buffer_clear(delivery->bytes); - delivery->done = false; - pn_record_clear(delivery->context); - - // begin delivery state - delivery->state.init = false; - delivery->state.sent = false; - // end delivery state - - if (!link->current) - link->current = delivery; - - link->unsettled_count++; - - pn_work_update(link->session->connection, delivery); - - // XXX: could just remove incref above - pn_decref(delivery); - - return delivery; -} - -bool pn_delivery_buffered(pn_delivery_t *delivery) -{ - assert(delivery); - if (delivery->settled) return false; - if (pn_link_is_sender(delivery->link)) { - pn_delivery_state_t *state = &delivery->state; - if (state->sent) { - return false; - } else { - return delivery->done || (pn_buffer_size(delivery->bytes) > 0); - } - } else { - return false; - } -} - -int pn_link_unsettled(pn_link_t *link) -{ - return link->unsettled_count; -} - -pn_delivery_t *pn_unsettled_head(pn_link_t *link) -{ - pn_delivery_t *d = link->unsettled_head; - while (d && d->local.settled) { - d = d->unsettled_next; - } - return d; -} - -pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery) -{ - pn_delivery_t *d = delivery->unsettled_next; - while (d && d->local.settled) { - d = d->unsettled_next; - } - return d; -} - -bool pn_delivery_current(pn_delivery_t *delivery) -{ - pn_link_t *link = delivery->link; - return pn_link_current(link) == delivery; -} - -void pn_delivery_dump(pn_delivery_t *d) -{ - char tag[1024]; - pn_bytes_t bytes = pn_buffer_bytes(d->tag); - pn_quote_data(tag, 1024, bytes.start, bytes.size); - printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%u, " - "remote.settled=%u, updated=%u, current=%u, writable=%u, readable=%u, " - "work=%u}", - tag, d->local.type, d->remote.type, d->local.settled, - d->remote.settled, d->updated, pn_delivery_current(d), - pn_delivery_writable(d), pn_delivery_readable(d), d->work); -} - -void *pn_delivery_get_context(pn_delivery_t *delivery) -{ - assert(delivery); - return pn_record_get(delivery->context, PN_LEGCTX); -} - -void pn_delivery_set_context(pn_delivery_t *delivery, void *context) -{ - assert(delivery); - pn_record_set(delivery->context, PN_LEGCTX, context); -} - -pn_record_t *pn_delivery_attachments(pn_delivery_t *delivery) -{ - assert(delivery); - return delivery->context; -} - -uint64_t pn_disposition_type(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->type; -} - -pn_data_t *pn_disposition_data(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->data; -} - -uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->section_number; -} - -void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number) -{ - assert(disposition); - disposition->section_number = section_number; -} - -uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->section_offset; -} - -void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset) -{ - assert(disposition); - disposition->section_offset = section_offset; -} - -bool pn_disposition_is_failed(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->failed; -} - -void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed) -{ - assert(disposition); - disposition->failed = failed; -} - -bool pn_disposition_is_undeliverable(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->undeliverable; -} - -void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable) -{ - assert(disposition); - disposition->undeliverable = undeliverable; -} - -pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->annotations; -} - -pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition) -{ - assert(disposition); - return &disposition->condition; -} - -pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery) -{ - if (delivery) { - pn_bytes_t tag = pn_buffer_bytes(delivery->tag); - return pn_dtag(tag.start, tag.size); - } else { - return pn_dtag(0, 0); - } -} - -pn_delivery_t *pn_link_current(pn_link_t *link) -{ - if (!link) return NULL; - return link->current; -} - -static void pni_advance_sender(pn_link_t *link) -{ - link->current->done = true; - link->queued++; - link->credit--; - link->session->outgoing_deliveries++; - pni_add_tpwork(link->current); - link->current = link->current->unsettled_next; -} - -static void pni_advance_receiver(pn_link_t *link) -{ - link->credit--; - link->queued--; - link->session->incoming_deliveries--; - - pn_delivery_t *current = link->current; - link->session->incoming_bytes -= pn_buffer_size(current->bytes); - pn_buffer_clear(current->bytes); - - if (!link->session->state.incoming_window) { - pni_add_tpwork(current); - } - - link->current = link->current->unsettled_next; -} - -bool pn_link_advance(pn_link_t *link) -{ - if (link && link->current) { - pn_delivery_t *prev = link->current; - if (link->endpoint.type == SENDER) { - pni_advance_sender(link); - } else { - pni_advance_receiver(link); - } - pn_delivery_t *next = link->current; - pn_work_update(link->session->connection, prev); - if (next) pn_work_update(link->session->connection, next); - return prev != next; - } else { - return false; - } -} - -int pn_link_credit(pn_link_t *link) -{ - return link ? link->credit : 0; -} - -int pn_link_available(pn_link_t *link) -{ - return link ? link->available : 0; -} - -int pn_link_queued(pn_link_t *link) -{ - return link ? link->queued : 0; -} - -int pn_link_remote_credit(pn_link_t *link) -{ - assert(link); - return link->credit - link->queued; -} - -bool pn_link_get_drain(pn_link_t *link) -{ - assert(link); - return link->drain; -} - -pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link) -{ - return link ? (pn_snd_settle_mode_t)link->snd_settle_mode - : PN_SND_MIXED; -} - -pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link) -{ - return link ? (pn_rcv_settle_mode_t)link->rcv_settle_mode - : PN_RCV_FIRST; -} - -pn_snd_settle_mode_t pn_link_remote_snd_settle_mode(pn_link_t *link) -{ - return link ? (pn_snd_settle_mode_t)link->remote_snd_settle_mode - : PN_SND_MIXED; -} - -pn_rcv_settle_mode_t pn_link_remote_rcv_settle_mode(pn_link_t *link) -{ - return link ? (pn_rcv_settle_mode_t)link->remote_rcv_settle_mode - : PN_RCV_FIRST; -} -void pn_link_set_snd_settle_mode(pn_link_t *link, pn_snd_settle_mode_t mode) -{ - if (link) - link->snd_settle_mode = (uint8_t)mode; -} -void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode) -{ - if (link) - link->rcv_settle_mode = (uint8_t)mode; -} - -void pn_delivery_settle(pn_delivery_t *delivery) -{ - assert(delivery); - if (!delivery->local.settled) { - pn_link_t *link = delivery->link; - if (pn_delivery_current(delivery)) { - pn_link_advance(link); - } - - link->unsettled_count--; - delivery->local.settled = true; - pni_add_tpwork(delivery); - pn_work_update(delivery->link->session->connection, delivery); - pn_incref(delivery); - pn_decref(delivery); - } -} - -void pn_link_offered(pn_link_t *sender, int credit) -{ - sender->available = credit; -} - -ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n) -{ - pn_delivery_t *current = pn_link_current(sender); - if (!current) return PN_EOS; - if (!bytes || !n) return 0; - pn_buffer_append(current->bytes, bytes, n); - sender->session->outgoing_bytes += n; - pni_add_tpwork(current); - return n; -} - -int pn_link_drained(pn_link_t *link) -{ - assert(link); - int drained = 0; - - if (pn_link_is_sender(link)) { - if (link->drain && link->credit > 0) { - link->drained = link->credit; - link->credit = 0; - pn_modified(link->session->connection, &link->endpoint, true); - drained = link->drained; - } - } else { - drained = link->drained; - link->drained = 0; - } - - return drained; -} - -ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n) -{ - if (!receiver) return PN_ARG_ERR; - - pn_delivery_t *delivery = receiver->current; - if (delivery) { - size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes); - pn_buffer_trim(delivery->bytes, size, 0); - if (size) { - receiver->session->incoming_bytes -= size; - if (!receiver->session->state.incoming_window) { - pni_add_tpwork(delivery); - } - return size; - } else { - return delivery->done ? PN_EOS : 0; - } - } else { - return PN_STATE_ERR; - } -} - -void pn_link_flow(pn_link_t *receiver, int credit) -{ - assert(receiver); - assert(pn_link_is_receiver(receiver)); - receiver->credit += credit; - pn_modified(receiver->session->connection, &receiver->endpoint, true); - if (!receiver->drain_flag_mode) { - pn_link_set_drain(receiver, false); - receiver->drain_flag_mode = false; - } -} - -void pn_link_drain(pn_link_t *receiver, int credit) -{ - assert(receiver); - assert(pn_link_is_receiver(receiver)); - pn_link_set_drain(receiver, true); - pn_link_flow(receiver, credit); - receiver->drain_flag_mode = false; -} - -void pn_link_set_drain(pn_link_t *receiver, bool drain) -{ - assert(receiver); - assert(pn_link_is_receiver(receiver)); - receiver->drain = drain; - pn_modified(receiver->session->connection, &receiver->endpoint, true); - receiver->drain_flag_mode = true; -} - -bool pn_link_draining(pn_link_t *receiver) -{ - assert(receiver); - assert(pn_link_is_receiver(receiver)); - return receiver->drain && (pn_link_credit(receiver) > pn_link_queued(receiver)); -} - -pn_link_t *pn_delivery_link(pn_delivery_t *delivery) -{ - assert(delivery); - return delivery->link; -} - -pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery) -{ - assert(delivery); - return &delivery->local; -} - -uint64_t pn_delivery_local_state(pn_delivery_t *delivery) -{ - assert(delivery); - return delivery->local.type; -} - -pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery) -{ - assert(delivery); - return &delivery->remote; -} - -uint64_t pn_delivery_remote_state(pn_delivery_t *delivery) -{ - assert(delivery); - return delivery->remote.type; -} - -bool pn_delivery_settled(pn_delivery_t *delivery) -{ - return delivery ? delivery->remote.settled : false; -} - -bool pn_delivery_updated(pn_delivery_t *delivery) -{ - return delivery ? delivery->updated : false; -} - -void pn_delivery_clear(pn_delivery_t *delivery) -{ - delivery->updated = false; - pn_work_update(delivery->link->session->connection, delivery); -} - -void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) -{ - if (!delivery) return; - delivery->local.type = state; - pni_add_tpwork(delivery); -} - -bool pn_delivery_writable(pn_delivery_t *delivery) -{ - if (!delivery) return false; - - pn_link_t *link = delivery->link; - return pn_link_is_sender(link) && pn_delivery_current(delivery) && pn_link_credit(link) > 0; -} - -bool pn_delivery_readable(pn_delivery_t *delivery) -{ - if (delivery) { - pn_link_t *link = delivery->link; - return pn_link_is_receiver(link) && pn_delivery_current(delivery); - } else { - return false; - } -} - -size_t pn_delivery_pending(pn_delivery_t *delivery) -{ - return pn_buffer_size(delivery->bytes); -} - -bool pn_delivery_partial(pn_delivery_t *delivery) -{ - return !delivery->done; -} - -pn_condition_t *pn_connection_condition(pn_connection_t *connection) -{ - assert(connection); - return &connection->endpoint.condition; -} - -pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection) -{ - assert(connection); - pn_transport_t *transport = connection->transport; - return transport ? &transport->remote_condition : NULL; -} - -pn_condition_t *pn_session_condition(pn_session_t *session) -{ - assert(session); - return &session->endpoint.condition; -} - -pn_condition_t *pn_session_remote_condition(pn_session_t *session) -{ - assert(session); - return &session->endpoint.remote_condition; -} - -pn_condition_t *pn_link_condition(pn_link_t *link) -{ - assert(link); - return &link->endpoint.condition; -} - -pn_condition_t *pn_link_remote_condition(pn_link_t *link) -{ - assert(link); - return &link->endpoint.remote_condition; -} - -bool pn_condition_is_set(pn_condition_t *condition) -{ - return condition && pn_string_get(condition->name); -} - -void pn_condition_clear(pn_condition_t *condition) -{ - assert(condition); - pn_string_clear(condition->name); - pn_string_clear(condition->description); - pn_data_clear(condition->info); -} - -const char *pn_condition_get_name(pn_condition_t *condition) -{ - assert(condition); - return pn_string_get(condition->name); -} - -int pn_condition_set_name(pn_condition_t *condition, const char *name) -{ - assert(condition); - return pn_string_set(condition->name, name); -} - -const char *pn_condition_get_description(pn_condition_t *condition) -{ - assert(condition); - return pn_string_get(condition->description); -} - -int pn_condition_set_description(pn_condition_t *condition, const char *description) -{ - assert(condition); - return pn_string_set(condition->description, description); -} - -int pn_condition_vformat(pn_condition_t *condition, const char *name, const char *fmt, va_list ap) -{ - assert(condition); - int err = pn_condition_set_name(condition, name); - if (err) - return err; - - char text[1024]; - size_t n = vsnprintf(text, 1024, fmt, ap); - if (n >= sizeof(text)) - text[sizeof(text)-1] = '\0'; - err = pn_condition_set_description(condition, text); - return err; -} - -int pn_condition_format(pn_condition_t *condition, const char *name, const char *fmt, ...) -{ - assert(condition); - va_list ap; - va_start(ap, fmt); - int err = pn_condition_vformat(condition, name, fmt, ap); - va_end(ap); - return err; -} - -pn_data_t *pn_condition_info(pn_condition_t *condition) -{ - assert(condition); - return condition->info; -} - -bool pn_condition_is_redirect(pn_condition_t *condition) -{ - const char *name = pn_condition_get_name(condition); - return name && (!strcmp(name, "amqp:connection:redirect") || - !strcmp(name, "amqp:link:redirect")); -} - -const char *pn_condition_redirect_host(pn_condition_t *condition) -{ - pn_data_t *data = pn_condition_info(condition); - pn_data_rewind(data); - pn_data_next(data); - pn_data_enter(data); - pn_data_lookup(data, "network-host"); - pn_bytes_t host = pn_data_get_bytes(data); - pn_data_rewind(data); - return host.start; -} - -int pn_condition_redirect_port(pn_condition_t *condition) -{ - pn_data_t *data = pn_condition_info(condition); - pn_data_rewind(data); - pn_data_next(data); - pn_data_enter(data); - pn_data_lookup(data, "port"); - int port = pn_data_get_int(data); - pn_data_rewind(data); - return port; -} - -pn_connection_t *pn_event_connection(pn_event_t *event) -{ - pn_session_t *ssn; - pn_transport_t *transport; - - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_connection: - return (pn_connection_t *) pn_event_context(event); - case CID_pn_transport: - transport = pn_event_transport(event); - if (transport) - return transport->connection; - return NULL; - default: - ssn = pn_event_session(event); - if (ssn) - return pn_session_connection(ssn); - } - return NULL; -} - -pn_session_t *pn_event_session(pn_event_t *event) -{ - pn_link_t *link; - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_session: - return (pn_session_t *) pn_event_context(event); - default: - link = pn_event_link(event); - if (link) - return pn_link_session(link); - } - return NULL; -} - -pn_link_t *pn_event_link(pn_event_t *event) -{ - pn_delivery_t *dlv; - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_link: - return (pn_link_t *) pn_event_context(event); - default: - dlv = pn_event_delivery(event); - if (dlv) - return pn_delivery_link(dlv); - } - return NULL; -} - -pn_delivery_t *pn_event_delivery(pn_event_t *event) -{ - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_delivery: - return (pn_delivery_t *) pn_event_context(event); - default: - return NULL; - } -} - -pn_transport_t *pn_event_transport(pn_event_t *event) -{ - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_transport: - return (pn_transport_t *) pn_event_context(event); - default: - { - pn_connection_t *conn = pn_event_connection(event); - if (conn) - return pn_connection_transport(conn); - return NULL; - } - } -}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/error.c ---------------------------------------------------------------------- diff --git a/proton-c/src/error.c b/proton-c/src/error.c deleted file mode 100644 index 9bef0fc..0000000 --- a/proton-c/src/error.c +++ /dev/null @@ -1,135 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/error.h> -#include <stdlib.h> -#include <string.h> -#include <assert.h> -#include "util.h" -#include "platform.h" - -struct pn_error_t { - char *text; - pn_error_t *root; - int code; -}; - -pn_error_t *pn_error() -{ - pn_error_t *error = (pn_error_t *) malloc(sizeof(pn_error_t)); - if (error != NULL) { - error->code = 0; - error->text = NULL; - error->root = NULL; - } - return error; -} - -void pn_error_free(pn_error_t *error) -{ - if (error) { - free(error->text); - free(error); - } -} - -void pn_error_clear(pn_error_t *error) -{ - if (error) { - error->code = 0; - free(error->text); - error->text = NULL; - error->root = NULL; - } -} - -int pn_error_set(pn_error_t *error, int code, const char *text) -{ - assert(error); - pn_error_clear(error); - if (code) { - error->code = code; - error->text = pn_strdup(text); - } - return code; -} - -int pn_error_vformat(pn_error_t *error, int code, const char *fmt, va_list ap) -{ - assert(error); - char text[1024]; - int n = vsnprintf(text, 1024, fmt, ap); - if (n >= 1024) { - text[1023] = '\0'; - } - return pn_error_set(error, code, text); -} - -int pn_error_format(pn_error_t *error, int code, const char *fmt, ...) -{ - assert(error); - va_list ap; - va_start(ap, fmt); - int rcode = pn_error_vformat(error, code, fmt, ap); - va_end(ap); - return rcode; -} - -int pn_error_code(pn_error_t *error) -{ - assert(error); - return error->code; -} - -const char *pn_error_text(pn_error_t *error) -{ - assert(error); - return error->text; -} - -int pn_error_copy(pn_error_t *error, pn_error_t *src) -{ - assert(error); - if (src) { - return pn_error_set(error, pn_error_code(src), pn_error_text(src)); - } else { - pn_error_clear(error); - return 0; - } -} - -const char *pn_code(int code) -{ - switch (code) - { - case 0: return "<ok>"; - case PN_EOS: return "PN_EOS"; - case PN_ERR: return "PN_ERR"; - case PN_OVERFLOW: return "PN_OVERFLOW"; - case PN_UNDERFLOW: return "PN_UNDERFLOW"; - case PN_STATE_ERR: return "PN_STATE_ERR"; - case PN_ARG_ERR: return "PN_ARG_ERR"; - case PN_TIMEOUT: return "PN_TIMEOUT"; - case PN_INTR: return "PN_INTR"; - case PN_OUT_OF_MEMORY: return "PN_OUT_OF_MEMORY"; - default: return "<unknown>"; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/events/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c deleted file mode 100644 index 5ad718e..0000000 --- a/proton-c/src/events/event.c +++ /dev/null @@ -1,388 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <stdio.h> -#include <proton/object.h> -#include <proton/event.h> -#include <proton/reactor.h> -#include <assert.h> - -struct pn_collector_t { - pn_list_t *pool; - pn_event_t *head; - pn_event_t *tail; - bool freed; -}; - -struct pn_event_t { - pn_list_t *pool; - const pn_class_t *clazz; - void *context; // depends on clazz - pn_record_t *attachments; - pn_event_t *next; - pn_event_type_t type; -}; - -static void pn_collector_initialize(pn_collector_t *collector) -{ - collector->pool = pn_list(PN_OBJECT, 0); - collector->head = NULL; - collector->tail = NULL; - collector->freed = false; -} - -static void pn_collector_drain(pn_collector_t *collector) -{ - assert(collector); - - while (pn_collector_peek(collector)) { - pn_collector_pop(collector); - } - - assert(!collector->head); - assert(!collector->tail); -} - -static void pn_collector_shrink(pn_collector_t *collector) -{ - assert(collector); - pn_list_clear(collector->pool); -} - -static void pn_collector_finalize(pn_collector_t *collector) -{ - pn_collector_drain(collector); - pn_decref(collector->pool); -} - -static int pn_collector_inspect(pn_collector_t *collector, pn_string_t *dst) -{ - assert(collector); - int err = pn_string_addf(dst, "EVENTS["); - if (err) return err; - pn_event_t *event = collector->head; - bool first = true; - while (event) { - if (first) { - first = false; - } else { - err = pn_string_addf(dst, ", "); - if (err) return err; - } - err = pn_inspect(event, dst); - if (err) return err; - event = event->next; - } - return pn_string_addf(dst, "]"); -} - -#define pn_collector_hashcode NULL -#define pn_collector_compare NULL - -PN_CLASSDEF(pn_collector) - -pn_collector_t *pn_collector(void) -{ - return pn_collector_new(); -} - -void pn_collector_free(pn_collector_t *collector) -{ - assert(collector); - pn_collector_release(collector); - pn_decref(collector); -} - -void pn_collector_release(pn_collector_t *collector) -{ - assert(collector); - if (!collector->freed) { - collector->freed = true; - pn_collector_drain(collector); - pn_collector_shrink(collector); - } -} - -pn_event_t *pn_event(void); - -pn_event_t *pn_collector_put(pn_collector_t *collector, - const pn_class_t *clazz, void *context, - pn_event_type_t type) -{ - if (!collector) { - return NULL; - } - - assert(context); - - if (collector->freed) { - return NULL; - } - - pn_event_t *tail = collector->tail; - if (tail && tail->type == type && tail->context == context) { - return NULL; - } - - clazz = clazz->reify(context); - - pn_event_t *event = (pn_event_t *) pn_list_pop(collector->pool); - - if (!event) { - event = pn_event(); - } - - event->pool = collector->pool; - pn_incref(event->pool); - - if (tail) { - tail->next = event; - collector->tail = event; - } else { - collector->tail = event; - collector->head = event; - } - - event->clazz = clazz; - event->context = context; - event->type = type; - pn_class_incref(clazz, event->context); - - return event; -} - -pn_event_t *pn_collector_peek(pn_collector_t *collector) -{ - return collector->head; -} - -bool pn_collector_pop(pn_collector_t *collector) -{ - pn_event_t *event = collector->head; - if (event) { - collector->head = event->next; - } else { - return false; - } - - if (!collector->head) { - collector->tail = NULL; - } - - pn_decref(event); - return true; -} - -bool pn_collector_more(pn_collector_t *collector) -{ - assert(collector); - return collector->head && collector->head->next; -} - -static void pn_event_initialize(pn_event_t *event) -{ - event->pool = NULL; - event->type = PN_EVENT_NONE; - event->clazz = NULL; - event->context = NULL; - event->next = NULL; - event->attachments = pn_record(); -} - -static void pn_event_finalize(pn_event_t *event) { - // decref before adding to the free list - if (event->clazz && event->context) { - pn_class_decref(event->clazz, event->context); - } - - pn_list_t *pool = event->pool; - - if (pool && pn_refcount(pool) > 1) { - event->pool = NULL; - event->type = PN_EVENT_NONE; - event->clazz = NULL; - event->context = NULL; - event->next = NULL; - pn_record_clear(event->attachments); - pn_list_add(pool, event); - } else { - pn_decref(event->attachments); - } - - pn_decref(pool); -} - -static int pn_event_inspect(pn_event_t *event, pn_string_t *dst) -{ - assert(event); - assert(dst); - const char *name = pn_event_type_name(event->type); - int err; - if (name) { - err = pn_string_addf(dst, "(%s", pn_event_type_name(event->type)); - } else { - err = pn_string_addf(dst, "(<%u>", (unsigned int) event->type); - } - if (err) return err; - if (event->context) { - err = pn_string_addf(dst, ", "); - if (err) return err; - err = pn_class_inspect(event->clazz, event->context, dst); - if (err) return err; - } - - return pn_string_addf(dst, ")"); -} - -#define pn_event_hashcode NULL -#define pn_event_compare NULL - -PN_CLASSDEF(pn_event) - -pn_event_t *pn_event(void) -{ - return pn_event_new(); -} - -pn_event_type_t pn_event_type(pn_event_t *event) -{ - return event->type; -} - -const pn_class_t *pn_event_class(pn_event_t *event) -{ - assert(event); - return event->clazz; -} - -void *pn_event_context(pn_event_t *event) -{ - assert(event); - return event->context; -} - -pn_record_t *pn_event_attachments(pn_event_t *event) -{ - assert(event); - return event->attachments; -} - -pn_handler_t *pn_event_root(pn_event_t *event) -{ - assert(event); - pn_handler_t *h = pn_record_get_handler(event->attachments); - return h; -} - -void pni_event_set_root(pn_event_t *event, pn_handler_t *handler) { - pn_record_set_handler(event->attachments, handler); -} - -const char *pn_event_type_name(pn_event_type_t type) -{ - switch (type) { - case PN_EVENT_NONE: - return "PN_EVENT_NONE"; - case PN_REACTOR_INIT: - return "PN_REACTOR_INIT"; - case PN_REACTOR_QUIESCED: - return "PN_REACTOR_QUIESCED"; - case PN_REACTOR_FINAL: - return "PN_REACTOR_FINAL"; - case PN_TIMER_TASK: - return "PN_TIMER_TASK"; - case PN_CONNECTION_INIT: - return "PN_CONNECTION_INIT"; - case PN_CONNECTION_BOUND: - return "PN_CONNECTION_BOUND"; - case PN_CONNECTION_UNBOUND: - return "PN_CONNECTION_UNBOUND"; - case PN_CONNECTION_REMOTE_OPEN: - return "PN_CONNECTION_REMOTE_OPEN"; - case PN_CONNECTION_LOCAL_OPEN: - return "PN_CONNECTION_LOCAL_OPEN"; - case PN_CONNECTION_REMOTE_CLOSE: - return "PN_CONNECTION_REMOTE_CLOSE"; - case PN_CONNECTION_LOCAL_CLOSE: - return "PN_CONNECTION_LOCAL_CLOSE"; - case PN_CONNECTION_FINAL: - return "PN_CONNECTION_FINAL"; - case PN_SESSION_INIT: - return "PN_SESSION_INIT"; - case PN_SESSION_REMOTE_OPEN: - return "PN_SESSION_REMOTE_OPEN"; - case PN_SESSION_LOCAL_OPEN: - return "PN_SESSION_LOCAL_OPEN"; - case PN_SESSION_REMOTE_CLOSE: - return "PN_SESSION_REMOTE_CLOSE"; - case PN_SESSION_LOCAL_CLOSE: - return "PN_SESSION_LOCAL_CLOSE"; - case PN_SESSION_FINAL: - return "PN_SESSION_FINAL"; - case PN_LINK_INIT: - return "PN_LINK_INIT"; - case PN_LINK_REMOTE_OPEN: - return "PN_LINK_REMOTE_OPEN"; - case PN_LINK_LOCAL_OPEN: - return "PN_LINK_LOCAL_OPEN"; - case PN_LINK_REMOTE_CLOSE: - return "PN_LINK_REMOTE_CLOSE"; - case PN_LINK_LOCAL_DETACH: - return "PN_LINK_LOCAL_DETACH"; - case PN_LINK_REMOTE_DETACH: - return "PN_LINK_REMOTE_DETACH"; - case PN_LINK_LOCAL_CLOSE: - return "PN_LINK_LOCAL_CLOSE"; - case PN_LINK_FLOW: - return "PN_LINK_FLOW"; - case PN_LINK_FINAL: - return "PN_LINK_FINAL"; - case PN_DELIVERY: - return "PN_DELIVERY"; - case PN_TRANSPORT: - return "PN_TRANSPORT"; - case PN_TRANSPORT_AUTHENTICATED: - return "PN_TRANSPORT_AUTHENTICATED"; - case PN_TRANSPORT_ERROR: - return "PN_TRANSPORT_ERROR"; - case PN_TRANSPORT_HEAD_CLOSED: - return "PN_TRANSPORT_HEAD_CLOSED"; - case PN_TRANSPORT_TAIL_CLOSED: - return "PN_TRANSPORT_TAIL_CLOSED"; - case PN_TRANSPORT_CLOSED: - return "PN_TRANSPORT_CLOSED"; - case PN_SELECTABLE_INIT: - return "PN_SELECTABLE_INIT"; - case PN_SELECTABLE_UPDATED: - return "PN_SELECTABLE_UPDATED"; - case PN_SELECTABLE_READABLE: - return "PN_SELECTABLE_READABLE"; - case PN_SELECTABLE_WRITABLE: - return "PN_SELECTABLE_WRITABLE"; - case PN_SELECTABLE_ERROR: - return "PN_SELECTABLE_ERROR"; - case PN_SELECTABLE_EXPIRED: - return "PN_SELECTABLE_EXPIRED"; - case PN_SELECTABLE_FINAL: - return "PN_SELECTABLE_FINAL"; - } - - return NULL; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/extra/parser.c ---------------------------------------------------------------------- diff --git a/proton-c/src/extra/parser.c b/proton-c/src/extra/parser.c new file mode 100644 index 0000000..36fb4fb --- /dev/null +++ b/proton-c/src/extra/parser.c @@ -0,0 +1,423 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/parser.h> + +#include "platform/platform.h" +#include "scanner.h" + +#include <proton/error.h> + +#include <stdlib.h> +#include <string.h> +#include <ctype.h> + +struct pn_parser_t { + pn_scanner_t *scanner; + char *atoms; + size_t size; + size_t capacity; + int error_code; +}; + +pn_parser_t *pn_parser() +{ + pn_parser_t *parser = (pn_parser_t *) malloc(sizeof(pn_parser_t)); + if (parser != NULL) { + parser->scanner = pn_scanner(); + parser->atoms = NULL; + parser->size = 0; + parser->capacity = 0; + } + return parser; +} + +static void pni_parser_ensure(pn_parser_t *parser, size_t size) +{ + while (parser->capacity - parser->size < size) { + parser->capacity = parser->capacity ? 2 * parser->capacity : 1024; + parser->atoms = (char *) realloc(parser->atoms, parser->capacity); + } +} + +int pn_parser_err(pn_parser_t *parser, int code, const char *fmt, ...) +{ + va_list ap; + va_start(ap, fmt); + int err = pn_scanner_verr(parser->scanner, code, fmt, ap); + va_end(ap); + return err; +} + +int pn_parser_errno(pn_parser_t *parser) +{ + return pn_scanner_errno(parser->scanner); +} + +const char *pn_parser_error(pn_parser_t *parser) +{ + return pn_scanner_error(parser->scanner); +} + +void pn_parser_free(pn_parser_t *parser) +{ + if (parser) { + pn_scanner_free(parser->scanner); + free(parser->atoms); + free(parser); + } +} + +static int pni_parser_shift(pn_parser_t *parser) +{ + return pn_scanner_shift(parser->scanner); +} + +static pn_token_t pni_parser_token(pn_parser_t *parser) +{ + return pn_scanner_token(parser->scanner); +} + +static int pni_parser_value(pn_parser_t *parser, pn_data_t *data); + +static int pni_parser_descriptor(pn_parser_t *parser, pn_data_t *data) +{ + if (pni_parser_token(parser).type == PN_TOK_AT) { + int err = pni_parser_shift(parser); + if (err) return err; + + err = pn_data_put_described(data); + if (err) return pn_parser_err(parser, err, "error writing described"); + pn_data_enter(data); + for (int i = 0; i < 2; i++) { + err = pni_parser_value(parser, data); + if (err) return err; + } + pn_data_exit(data); + return 0; + } else { + return pn_parser_err(parser, PN_ERR, "expecting '@'"); + } +} + +static int pni_parser_map(pn_parser_t *parser, pn_data_t *data) +{ + if (pni_parser_token(parser).type == PN_TOK_LBRACE) { + int err = pni_parser_shift(parser); + if (err) return err; + + err = pn_data_put_map(data); + if (err) return pn_parser_err(parser, err, "error writing map"); + + pn_data_enter(data); + + if (pni_parser_token(parser).type != PN_TOK_RBRACE) { + while (true) { + err = pni_parser_value(parser, data); + if (err) return err; + + if (pni_parser_token(parser).type == PN_TOK_EQUAL) { + err = pni_parser_shift(parser); + if (err) return err; + } else { + return pn_parser_err(parser, PN_ERR, "expecting '='"); + } + + err = pni_parser_value(parser, data); + if (err) return err; + + if (pni_parser_token(parser).type == PN_TOK_COMMA) { + err = pni_parser_shift(parser); + if (err) return err; + } else { + break; + } + } + } + + pn_data_exit(data); + + if (pni_parser_token(parser).type == PN_TOK_RBRACE) { + return pni_parser_shift(parser); + } else { + return pn_parser_err(parser, PN_ERR, "expecting '}'"); + } + } else { + return pn_parser_err(parser, PN_ERR, "expecting '{'"); + } +} + +static int pni_parser_list(pn_parser_t *parser, pn_data_t *data) +{ + int err; + + if (pni_parser_token(parser).type == PN_TOK_LBRACKET) { + err = pni_parser_shift(parser); + if (err) return err; + + err = pn_data_put_list(data); + if (err) return pn_parser_err(parser, err, "error writing list"); + + pn_data_enter(data); + + if (pni_parser_token(parser).type != PN_TOK_RBRACKET) { + while (true) { + err = pni_parser_value(parser, data); + if (err) return err; + + if (pni_parser_token(parser).type == PN_TOK_COMMA) { + err = pni_parser_shift(parser); + if (err) return err; + } else { + break; + } + } + } + + pn_data_exit(data); + + if (pni_parser_token(parser).type == PN_TOK_RBRACKET) { + return pni_parser_shift(parser); + } else { + return pn_parser_err(parser, PN_ERR, "expecting ']'"); + } + } else { + return pn_parser_err(parser, PN_ERR, "expecting '['"); + } +} + +static void pni_parser_append_tok(pn_parser_t *parser, char *dst, int *idx) +{ + memcpy(dst + *idx, pni_parser_token(parser).start, pni_parser_token(parser).size); + *idx += pni_parser_token(parser).size; +} + +static int pni_parser_number(pn_parser_t *parser, pn_data_t *data) +{ + bool dbl = false; + char number[1024]; + int idx = 0; + int err; + + bool negate = false; + + if (pni_parser_token(parser).type == PN_TOK_NEG || pni_parser_token(parser).type == PN_TOK_POS) { + if (pni_parser_token(parser).type == PN_TOK_NEG) + negate = !negate; + err = pni_parser_shift(parser); + if (err) return err; + } + + if (pni_parser_token(parser).type == PN_TOK_FLOAT || pni_parser_token(parser).type == PN_TOK_INT) { + dbl = pni_parser_token(parser).type == PN_TOK_FLOAT; + pni_parser_append_tok(parser, number, &idx); + err = pni_parser_shift(parser); + if (err) return err; + } else { + return pn_parser_err(parser, PN_ERR, "expecting FLOAT or INT"); + } + + number[idx] = '\0'; + + if (dbl) { + double value = atof(number); + if (negate) { + value = -value; + } + err = pn_data_put_double(data, value); + if (err) return pn_parser_err(parser, err, "error writing double"); + } else { + int64_t value = pn_i_atoll(number); + if (negate) { + value = -value; + } + err = pn_data_put_long(data, value); + if (err) return pn_parser_err(parser, err, "error writing long"); + } + + return 0; +} + +static int pni_parser_unquote(pn_parser_t *parser, char *dst, const char *src, size_t *n) +{ + size_t idx = 0; + bool escape = false; + int start, end; + if (src[0] != '"') { + if (src[1] == '"') { + start = 2; + end = *n - 1; + } else { + start = 1; + end = *n; + } + } else { + start = 1; + end = *n - 1; + } + for (int i = start; i < end; i++) + { + char c = src[i]; + if (escape) { + switch (c) { + case '"': + case '\\': + case '/': + dst[idx++] = c; + escape = false; + break; + case 'b': + dst[idx++] = '\b'; + break; + case 'f': + dst[idx++] = '\f'; + break; + case 'n': + dst[idx++] = '\n'; + break; + case 'r': + dst[idx++] = '\r'; + break; + case 't': + dst[idx++] = '\t'; + break; + case 'x': + { + char n1 = toupper(src[i+1]); + char n2 = n1 ? toupper(src[i+2]) : 0; + if (!n2) { + return pn_parser_err(parser, PN_ERR, "truncated escape code"); + } + int d1 = isdigit(n1) ? n1 - '0' : n1 - 'A' + 10; + int d2 = isdigit(n2) ? n2 - '0' : n2 - 'A' + 10; + dst[idx++] = d1*16 + d2; + i += 2; + } + break; + // XXX: need to handle unicode escapes: 'u' + default: + return pn_parser_err(parser, PN_ERR, "unrecognized escape code"); + } + escape = false; + } else { + switch (c) + { + case '\\': + escape = true; + break; + default: + dst[idx++] = c; + break; + } + } + } + dst[idx++] = '\0'; + *n = idx; + return 0; +} + +static int pni_parser_value(pn_parser_t *parser, pn_data_t *data) +{ + int err; + size_t n; + char *dst; + + pn_token_t tok = pni_parser_token(parser); + + switch (tok.type) + { + case PN_TOK_AT: + return pni_parser_descriptor(parser, data); + case PN_TOK_LBRACE: + return pni_parser_map(parser, data); + case PN_TOK_LBRACKET: + return pni_parser_list(parser, data); + case PN_TOK_BINARY: + case PN_TOK_SYMBOL: + case PN_TOK_STRING: + n = tok.size; + pni_parser_ensure(parser, n); + dst = parser->atoms + parser->size; + err = pni_parser_unquote(parser, dst, tok.start, &n); + if (err) return err; + parser->size += n; + switch (tok.type) { + case PN_TOK_BINARY: + err = pn_data_put_binary(data, pn_bytes(n - 1, dst)); + break; + case PN_TOK_STRING: + err = pn_data_put_string(data, pn_bytes(n - 1, dst)); + break; + case PN_TOK_SYMBOL: + err = pn_data_put_symbol(data, pn_bytes(n - 1, dst)); + break; + default: + return pn_parser_err(parser, PN_ERR, "internal error"); + } + if (err) return pn_parser_err(parser, err, "error writing string/binary/symbol"); + return pni_parser_shift(parser); + case PN_TOK_POS: + case PN_TOK_NEG: + case PN_TOK_FLOAT: + case PN_TOK_INT: + return pni_parser_number(parser, data); + case PN_TOK_TRUE: + err = pn_data_put_bool(data, true); + if (err) return pn_parser_err(parser, err, "error writing boolean"); + return pni_parser_shift(parser); + case PN_TOK_FALSE: + err = pn_data_put_bool(data, false); + if (err) return pn_parser_err(parser, err, "error writing boolean"); + return pni_parser_shift(parser); + case PN_TOK_NULL: + err = pn_data_put_null(data); + if (err) return pn_parser_err(parser, err, "error writing null"); + return pni_parser_shift(parser); + default: + return pn_parser_err(parser, PN_ERR, "expecting one of '[', '{', STRING, " + "SYMBOL, BINARY, true, false, null, NUMBER"); + } +} + +static int pni_parser_parse_r(pn_parser_t *parser, pn_data_t *data) +{ + while (true) { + int err; + switch (pni_parser_token(parser).type) + { + case PN_TOK_EOS: + return 0; + case PN_TOK_ERR: + return PN_ERR; + default: + err = pni_parser_value(parser, data); + if (err) return err; + } + } +} + +int pn_parser_parse(pn_parser_t *parser, const char *str, pn_data_t *data) +{ + int err = pn_scanner_start(parser->scanner, str); + if (err) return err; + parser->size = 0; + return pni_parser_parse_r(parser, data); +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
