http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/engine.c b/proton-c/src/core/engine.c deleted file mode 100644 index 8c2aeb0..0000000 --- a/proton-c/src/core/engine.c +++ /dev/null @@ -1,2277 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "engine-internal.h" -#include <stdlib.h> -#include <string.h> -#include "protocol.h" - -#include <assert.h> -#include <stdarg.h> -#include <stdio.h> - -#include "platform/platform.h" -#include "platform/platform_fmt.h" -#include "transport.h" - -static void pni_session_bound(pn_session_t *ssn); -static void pni_link_bound(pn_link_t *link); - - -// endpoints - -static pn_connection_t *pni_ep_get_connection(pn_endpoint_t *endpoint) -{ - switch (endpoint->type) { - case CONNECTION: - return (pn_connection_t *) endpoint; - case SESSION: - return ((pn_session_t *) endpoint)->connection; - case SENDER: - case RECEIVER: - return ((pn_link_t *) endpoint)->session->connection; - } - - return NULL; -} - -static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) { - switch (type) { - case CONNECTION: - return open ? PN_CONNECTION_LOCAL_OPEN : PN_CONNECTION_LOCAL_CLOSE; - case SESSION: - return open ? PN_SESSION_LOCAL_OPEN : PN_SESSION_LOCAL_CLOSE; - case SENDER: - case RECEIVER: - return open ? PN_LINK_LOCAL_OPEN : PN_LINK_LOCAL_CLOSE; - default: - assert(false); - return PN_EVENT_NONE; - } -} - -static void pn_endpoint_open(pn_endpoint_t *endpoint) -{ - if (!(endpoint->state & PN_LOCAL_ACTIVE)) { - PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE); - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_collector_put(conn->collector, PN_OBJECT, endpoint, - endpoint_event(endpoint->type, true)); - pn_modified(conn, endpoint, true); - } -} - -static void pn_endpoint_close(pn_endpoint_t *endpoint) -{ - if (!(endpoint->state & PN_LOCAL_CLOSED)) { - PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED); - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_collector_put(conn->collector, PN_OBJECT, endpoint, - endpoint_event(endpoint->type, false)); - pn_modified(conn, endpoint, true); - } -} - -void pn_connection_reset(pn_connection_t *connection) -{ - assert(connection); - pn_endpoint_t *endpoint = &connection->endpoint; - endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; -} - -void pn_connection_open(pn_connection_t *connection) -{ - assert(connection); - pn_endpoint_open(&connection->endpoint); -} - -void pn_connection_close(pn_connection_t *connection) -{ - assert(connection); - pn_endpoint_close(&connection->endpoint); -} - -static void pni_endpoint_tini(pn_endpoint_t *endpoint); - -void pn_connection_release(pn_connection_t *connection) -{ - assert(!connection->endpoint.freed); - // free those endpoints that haven't been freed by the application - LL_REMOVE(connection, endpoint, &connection->endpoint); - while (connection->endpoint_head) { - pn_endpoint_t *ep = connection->endpoint_head; - switch (ep->type) { - case SESSION: - // note: this will free all child links: - pn_session_free((pn_session_t *)ep); - break; - case SENDER: - case RECEIVER: - pn_link_free((pn_link_t *)ep); - break; - default: - assert(false); - } - } - connection->endpoint.freed = true; - if (!connection->transport) { - // no transport available to consume transport work items, - // so manually clear them: - pn_ep_incref(&connection->endpoint); - pn_connection_unbound(connection); - } - pn_ep_decref(&connection->endpoint); -} - -void pn_connection_free(pn_connection_t *connection) { - pn_connection_release(connection); - pn_decref(connection); -} - -void pn_connection_bound(pn_connection_t *connection) -{ - pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_BOUND); - pn_ep_incref(&connection->endpoint); - - size_t nsessions = pn_list_size(connection->sessions); - for (size_t i = 0; i < nsessions; i++) { - pni_session_bound((pn_session_t *) pn_list_get(connection->sessions, i)); - } -} - -// invoked when transport has been removed: -void pn_connection_unbound(pn_connection_t *connection) -{ - connection->transport = NULL; - if (connection->endpoint.freed) { - // connection has been freed prior to unbinding, thus it - // cannot be re-assigned to a new transport. Clear the - // transport work lists to allow the connection to be freed. - while (connection->transport_head) { - pn_clear_modified(connection, connection->transport_head); - } - while (connection->tpwork_head) { - pn_clear_tpwork(connection->tpwork_head); - } - } - pn_ep_decref(&connection->endpoint); -} - -pn_record_t *pn_connection_attachments(pn_connection_t *connection) -{ - assert(connection); - return connection->context; -} - -void *pn_connection_get_context(pn_connection_t *conn) -{ - // XXX: we should really assert on conn here, but this causes - // messenger tests to fail - return conn ? pn_record_get(conn->context, PN_LEGCTX) : NULL; -} - -void pn_connection_set_context(pn_connection_t *conn, void *context) -{ - assert(conn); - pn_record_set(conn->context, PN_LEGCTX, context); -} - -pn_transport_t *pn_connection_transport(pn_connection_t *connection) -{ - assert(connection); - return connection->transport; -} - -void pn_condition_init(pn_condition_t *condition) -{ - condition->name = pn_string(NULL); - condition->description = pn_string(NULL); - condition->info = pn_data(0); -} - -pn_condition_t *pn_condition() { - pn_condition_t *c = (pn_condition_t*)malloc(sizeof(pn_condition_t)); - pn_condition_init(c); - return c; -} - -void pn_condition_tini(pn_condition_t *condition) -{ - pn_data_free(condition->info); - pn_free(condition->description); - pn_free(condition->name); -} - -void pn_condition_free(pn_condition_t *c) { - if (c) { - pn_condition_clear(c); - pn_condition_tini(c); - free(c); - } -} - -static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn) -{ - pn_list_add(conn->sessions, ssn); - ssn->connection = conn; - pn_incref(conn); // keep around until finalized - pn_ep_incref(&conn->endpoint); -} - -static void pni_remove_session(pn_connection_t *conn, pn_session_t *ssn) -{ - if (pn_list_remove(conn->sessions, ssn)) { - pn_ep_decref(&conn->endpoint); - LL_REMOVE(conn, endpoint, &ssn->endpoint); - } -} - -pn_connection_t *pn_session_connection(pn_session_t *session) -{ - if (!session) return NULL; - return session->connection; -} - -void pn_session_open(pn_session_t *session) -{ - assert(session); - pn_endpoint_open(&session->endpoint); -} - -void pn_session_close(pn_session_t *session) -{ - assert(session); - pn_endpoint_close(&session->endpoint); -} - -void pn_session_free(pn_session_t *session) -{ - assert(!session->endpoint.freed); - while(pn_list_size(session->links)) { - pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0); - pn_link_free(link); - } - pni_remove_session(session->connection, session); - pn_list_add(session->connection->freed, session); - session->endpoint.freed = true; - pn_ep_decref(&session->endpoint); - - // the finalize logic depends on endpoint.freed, so we incref/decref - // to give it a chance to rerun - pn_incref(session); - pn_decref(session); -} - -pn_record_t *pn_session_attachments(pn_session_t *session) -{ - assert(session); - return session->context; -} - -void *pn_session_get_context(pn_session_t *session) -{ - return session ? pn_record_get(session->context, PN_LEGCTX) : 0; -} - -void pn_session_set_context(pn_session_t *session, void *context) -{ - assert(context); - pn_record_set(session->context, PN_LEGCTX, context); -} - - -static void pni_add_link(pn_session_t *ssn, pn_link_t *link) -{ - pn_list_add(ssn->links, link); - link->session = ssn; - pn_ep_incref(&ssn->endpoint); -} - -static void pni_remove_link(pn_session_t *ssn, pn_link_t *link) -{ - if (pn_list_remove(ssn->links, link)) { - pn_ep_decref(&ssn->endpoint); - LL_REMOVE(ssn->connection, endpoint, &link->endpoint); - } -} - -void pn_link_open(pn_link_t *link) -{ - assert(link); - pn_endpoint_open(&link->endpoint); -} - -void pn_link_close(pn_link_t *link) -{ - assert(link); - pn_endpoint_close(&link->endpoint); -} - -void pn_link_detach(pn_link_t *link) -{ - assert(link); - if (link->detached) return; - - link->detached = true; - pn_collector_put(link->session->connection->collector, PN_OBJECT, link, PN_LINK_LOCAL_DETACH); - pn_modified(link->session->connection, &link->endpoint, true); - -} - -static void pni_terminus_free(pn_terminus_t *terminus) -{ - pn_free(terminus->address); - pn_free(terminus->properties); - pn_free(terminus->capabilities); - pn_free(terminus->outcomes); - pn_free(terminus->filter); -} - -void pn_link_free(pn_link_t *link) -{ - assert(!link->endpoint.freed); - pni_remove_link(link->session, link); - pn_list_add(link->session->freed, link); - pn_delivery_t *delivery = link->unsettled_head; - while (delivery) { - pn_delivery_t *next = delivery->unsettled_next; - pn_delivery_settle(delivery); - delivery = next; - } - link->endpoint.freed = true; - pn_ep_decref(&link->endpoint); - - // the finalize logic depends on endpoint.freed (modified above), so - // we incref/decref to give it a chance to rerun - pn_incref(link); - pn_decref(link); -} - -void *pn_link_get_context(pn_link_t *link) -{ - assert(link); - return pn_record_get(link->context, PN_LEGCTX); -} - -void pn_link_set_context(pn_link_t *link, void *context) -{ - assert(link); - pn_record_set(link->context, PN_LEGCTX, context); -} - -pn_record_t *pn_link_attachments(pn_link_t *link) -{ - assert(link); - return link->context; -} - -void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn) -{ - endpoint->type = (pn_endpoint_type_t) type; - endpoint->referenced = true; - endpoint->state = PN_LOCAL_UNINIT | PN_REMOTE_UNINIT; - endpoint->error = pn_error(); - pn_condition_init(&endpoint->condition); - pn_condition_init(&endpoint->remote_condition); - endpoint->endpoint_next = NULL; - endpoint->endpoint_prev = NULL; - endpoint->transport_next = NULL; - endpoint->transport_prev = NULL; - endpoint->modified = false; - endpoint->freed = false; - endpoint->refcount = 1; - //fprintf(stderr, "initting 0x%lx\n", (uintptr_t) endpoint); - - LL_ADD(conn, endpoint, endpoint); -} - -void pn_ep_incref(pn_endpoint_t *endpoint) -{ - endpoint->refcount++; -} - -static pn_event_type_t pn_final_type(pn_endpoint_type_t type) { - switch (type) { - case CONNECTION: - return PN_CONNECTION_FINAL; - case SESSION: - return PN_SESSION_FINAL; - case SENDER: - case RECEIVER: - return PN_LINK_FINAL; - default: - assert(false); - return PN_EVENT_NONE; - } -} - -static pn_endpoint_t *pn_ep_parent(pn_endpoint_t *endpoint) { - switch (endpoint->type) { - case CONNECTION: - return NULL; - case SESSION: - return &((pn_session_t *) endpoint)->connection->endpoint; - case SENDER: - case RECEIVER: - return &((pn_link_t *) endpoint)->session->endpoint; - default: - assert(false); - return NULL; - } -} - -void pn_ep_decref(pn_endpoint_t *endpoint) -{ - assert(endpoint->refcount > 0); - endpoint->refcount--; - if (endpoint->refcount == 0) { - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_collector_put(conn->collector, PN_OBJECT, endpoint, pn_final_type(endpoint->type)); - } -} - -static void pni_endpoint_tini(pn_endpoint_t *endpoint) -{ - pn_error_free(endpoint->error); - pn_condition_tini(&endpoint->remote_condition); - pn_condition_tini(&endpoint->condition); -} - -static void pni_free_children(pn_list_t *children, pn_list_t *freed) -{ - while (pn_list_size(children) > 0) { - pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(children, 0); - assert(!endpoint->referenced); - pn_free(endpoint); - } - - while (pn_list_size(freed) > 0) { - pn_endpoint_t *endpoint = (pn_endpoint_t *) pn_list_get(freed, 0); - assert(!endpoint->referenced); - pn_free(endpoint); - } - - pn_free(children); - pn_free(freed); -} - -static void pn_connection_finalize(void *object) -{ - pn_connection_t *conn = (pn_connection_t *) object; - pn_endpoint_t *endpoint = &conn->endpoint; - - if (conn->transport) { - assert(!conn->transport->referenced); - pn_free(conn->transport); - } - - // freeing the transport could post events - if (pn_refcount(conn) > 0) { - return; - } - - pni_free_children(conn->sessions, conn->freed); - pn_free(conn->context); - pn_decref(conn->collector); - - pn_free(conn->container); - pn_free(conn->hostname); - pn_free(conn->auth_user); - pn_free(conn->auth_password); - pn_free(conn->offered_capabilities); - pn_free(conn->desired_capabilities); - pn_free(conn->properties); - pni_endpoint_tini(endpoint); - pn_free(conn->delivery_pool); -} - -#define pn_connection_initialize NULL -#define pn_connection_hashcode NULL -#define pn_connection_compare NULL -#define pn_connection_inspect NULL - -pn_connection_t *pn_connection() -{ - static const pn_class_t clazz = PN_CLASS(pn_connection); - pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t)); - if (!conn) return NULL; - - conn->endpoint_head = NULL; - conn->endpoint_tail = NULL; - pn_endpoint_init(&conn->endpoint, CONNECTION, conn); - conn->transport_head = NULL; - conn->transport_tail = NULL; - conn->sessions = pn_list(PN_WEAKREF, 0); - conn->freed = pn_list(PN_WEAKREF, 0); - conn->transport = NULL; - conn->work_head = NULL; - conn->work_tail = NULL; - conn->tpwork_head = NULL; - conn->tpwork_tail = NULL; - conn->container = pn_string(NULL); - conn->hostname = pn_string(NULL); - conn->auth_user = pn_string(NULL); - conn->auth_password = pn_string(NULL); - conn->offered_capabilities = pn_data(0); - conn->desired_capabilities = pn_data(0); - conn->properties = pn_data(0); - conn->collector = NULL; - conn->context = pn_record(); - conn->delivery_pool = pn_list(PN_OBJECT, 0); - - return conn; -} - -static const pn_event_type_t endpoint_init_event_map[] = { - PN_CONNECTION_INIT, /* CONNECTION */ - PN_SESSION_INIT, /* SESSION */ - PN_LINK_INIT, /* SENDER */ - PN_LINK_INIT}; /* RECEIVER */ - -void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector) -{ - pn_decref(connection->collector); - connection->collector = collector; - pn_incref(connection->collector); - pn_endpoint_t *endpoint = connection->endpoint_head; - while (endpoint) { - pn_collector_put(connection->collector, PN_OBJECT, endpoint, endpoint_init_event_map[endpoint->type]); - endpoint = endpoint->endpoint_next; - } -} - -pn_collector_t* pn_connection_collector(pn_connection_t *connection) { - return connection->collector; -} - -pn_state_t pn_connection_state(pn_connection_t *connection) -{ - return connection ? connection->endpoint.state : 0; -} - -pn_error_t *pn_connection_error(pn_connection_t *connection) -{ - return connection ? connection->endpoint.error : NULL; -} - -const char *pn_connection_get_container(pn_connection_t *connection) -{ - assert(connection); - return pn_string_get(connection->container); -} - -void pn_connection_set_container(pn_connection_t *connection, const char *container) -{ - assert(connection); - pn_string_set(connection->container, container); -} - -const char *pn_connection_get_hostname(pn_connection_t *connection) -{ - assert(connection); - return pn_string_get(connection->hostname); -} - -void pn_connection_set_hostname(pn_connection_t *connection, const char *hostname) -{ - assert(connection); - pn_string_set(connection->hostname, hostname); -} - -const char *pn_connection_get_user(pn_connection_t *connection) -{ - assert(connection); - return pn_string_get(connection->auth_user); -} - -void pn_connection_set_user(pn_connection_t *connection, const char *user) -{ - assert(connection); - pn_string_set(connection->auth_user, user); -} - -void pn_connection_set_password(pn_connection_t *connection, const char *password) -{ - assert(connection); - // Make sure the previous password is erased, if there was one. - size_t n = pn_string_size(connection->auth_password); - const char* s = pn_string_get(connection->auth_password); - if (n > 0 && s) memset((void*)s, 0, n); - pn_string_set(connection->auth_password, password); -} - -pn_data_t *pn_connection_offered_capabilities(pn_connection_t *connection) -{ - assert(connection); - return connection->offered_capabilities; -} - -pn_data_t *pn_connection_desired_capabilities(pn_connection_t *connection) -{ - assert(connection); - return connection->desired_capabilities; -} - -pn_data_t *pn_connection_properties(pn_connection_t *connection) -{ - assert(connection); - return connection->properties; -} - -pn_data_t *pn_connection_remote_offered_capabilities(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_offered_capabilities : NULL; -} - -pn_data_t *pn_connection_remote_desired_capabilities(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_desired_capabilities : NULL; -} - -pn_data_t *pn_connection_remote_properties(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_properties : NULL; -} - -const char *pn_connection_remote_container(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_container : NULL; -} - -const char *pn_connection_remote_hostname(pn_connection_t *connection) -{ - assert(connection); - return connection->transport ? connection->transport->remote_hostname : NULL; -} - -pn_delivery_t *pn_work_head(pn_connection_t *connection) -{ - assert(connection); - return connection->work_head; -} - -pn_delivery_t *pn_work_next(pn_delivery_t *delivery) -{ - assert(delivery); - - if (delivery->work) - return delivery->work_next; - else - return pn_work_head(delivery->link->session->connection); -} - -static void pni_add_work(pn_connection_t *connection, pn_delivery_t *delivery) -{ - if (!delivery->work) - { - assert(!delivery->local.settled); // never allow settled deliveries - LL_ADD(connection, work, delivery); - delivery->work = true; - } -} - -static void pni_clear_work(pn_connection_t *connection, pn_delivery_t *delivery) -{ - if (delivery->work) - { - LL_REMOVE(connection, work, delivery); - delivery->work = false; - } -} - -void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery) -{ - pn_link_t *link = pn_delivery_link(delivery); - pn_delivery_t *current = pn_link_current(link); - if (delivery->updated && !delivery->local.settled) { - pni_add_work(connection, delivery); - } else if (delivery == current) { - if (link->endpoint.type == SENDER) { - if (pn_link_credit(link) > 0) { - pni_add_work(connection, delivery); - } else { - pni_clear_work(connection, delivery); - } - } else { - pni_add_work(connection, delivery); - } - } else { - pni_clear_work(connection, delivery); - } -} - -static void pni_add_tpwork(pn_delivery_t *delivery) -{ - pn_connection_t *connection = delivery->link->session->connection; - if (!delivery->tpwork) - { - LL_ADD(connection, tpwork, delivery); - delivery->tpwork = true; - } - pn_modified(connection, &connection->endpoint, true); -} - -void pn_clear_tpwork(pn_delivery_t *delivery) -{ - pn_connection_t *connection = delivery->link->session->connection; - if (delivery->tpwork) - { - LL_REMOVE(connection, tpwork, delivery); - delivery->tpwork = false; - if (pn_refcount(delivery) > 0) { - pn_incref(delivery); - pn_decref(delivery); - } - } -} - -void pn_dump(pn_connection_t *conn) -{ - pn_endpoint_t *endpoint = conn->transport_head; - while (endpoint) - { - printf("%p", (void *) endpoint); - endpoint = endpoint->transport_next; - if (endpoint) - printf(" -> "); - } - printf("\n"); -} - -void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit) -{ - if (!endpoint->modified) { - LL_ADD(connection, transport, endpoint); - endpoint->modified = true; - } - - if (emit && connection->transport) { - pn_collector_put(connection->collector, PN_OBJECT, connection->transport, - PN_TRANSPORT); - } -} - -void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint) -{ - if (endpoint->modified) { - LL_REMOVE(connection, transport, endpoint); - endpoint->transport_next = NULL; - endpoint->transport_prev = NULL; - endpoint->modified = false; - } -} - -static bool pni_matches(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) -{ - if (endpoint->type != type) return false; - - if (!state) return true; - - int st = endpoint->state; - if ((state & PN_REMOTE_MASK) == 0 || (state & PN_LOCAL_MASK) == 0) - return st & state; - else - return st == state; -} - -pn_endpoint_t *pn_find(pn_endpoint_t *endpoint, pn_endpoint_type_t type, pn_state_t state) -{ - while (endpoint) - { - if (pni_matches(endpoint, type, state)) - return endpoint; - endpoint = endpoint->endpoint_next; - } - return NULL; -} - -pn_session_t *pn_session_head(pn_connection_t *conn, pn_state_t state) -{ - if (conn) - return (pn_session_t *) pn_find(conn->endpoint_head, SESSION, state); - else - return NULL; -} - -pn_session_t *pn_session_next(pn_session_t *ssn, pn_state_t state) -{ - if (ssn) - return (pn_session_t *) pn_find(ssn->endpoint.endpoint_next, SESSION, state); - else - return NULL; -} - -pn_link_t *pn_link_head(pn_connection_t *conn, pn_state_t state) -{ - if (!conn) return NULL; - - pn_endpoint_t *endpoint = conn->endpoint_head; - - while (endpoint) - { - if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state)) - return (pn_link_t *) endpoint; - endpoint = endpoint->endpoint_next; - } - - return NULL; -} - -pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state) -{ - if (!link) return NULL; - - pn_endpoint_t *endpoint = link->endpoint.endpoint_next; - - while (endpoint) - { - if (pni_matches(endpoint, SENDER, state) || pni_matches(endpoint, RECEIVER, state)) - return (pn_link_t *) endpoint; - endpoint = endpoint->endpoint_next; - } - - return NULL; -} - -static void pn_session_incref(void *object) -{ - pn_session_t *session = (pn_session_t *) object; - if (!session->endpoint.referenced) { - session->endpoint.referenced = true; - pn_incref(session->connection); - } else { - pn_object_incref(object); - } -} - -static bool pn_ep_bound(pn_endpoint_t *endpoint) -{ - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_session_t *ssn; - pn_link_t *lnk; - - if (!conn->transport) return false; - if (endpoint->modified) return true; - - switch (endpoint->type) { - case CONNECTION: - return ((pn_connection_t *)endpoint)->transport; - case SESSION: - ssn = (pn_session_t *) endpoint; - return (((int16_t) ssn->state.local_channel) >= 0 || ((int16_t) ssn->state.remote_channel) >= 0); - case SENDER: - case RECEIVER: - lnk = (pn_link_t *) endpoint; - return ((int32_t) lnk->state.local_handle) >= 0 || ((int32_t) lnk->state.remote_handle) >= 0; - default: - assert(false); - return false; - } -} - -static bool pni_connection_live(pn_connection_t *conn) { - return pn_refcount(conn) > 1; -} - -static bool pni_session_live(pn_session_t *ssn) { - return pni_connection_live(ssn->connection) || pn_refcount(ssn) > 1; -} - -static bool pni_link_live(pn_link_t *link) { - return pni_session_live(link->session) || pn_refcount(link) > 1; -} - -static bool pni_endpoint_live(pn_endpoint_t *endpoint) { - switch (endpoint->type) { - case CONNECTION: - return pni_connection_live((pn_connection_t *)endpoint); - case SESSION: - return pni_session_live((pn_session_t *) endpoint); - case SENDER: - case RECEIVER: - return pni_link_live((pn_link_t *) endpoint); - default: - assert(false); - return false; - } -} - -static bool pni_preserve_child(pn_endpoint_t *endpoint) -{ - pn_connection_t *conn = pni_ep_get_connection(endpoint); - pn_endpoint_t *parent = pn_ep_parent(endpoint); - if (pni_endpoint_live(parent) && (!endpoint->freed || (pn_ep_bound(endpoint))) - && endpoint->referenced) { - pn_object_incref(endpoint); - endpoint->referenced = false; - pn_decref(parent); - return true; - } else { - LL_REMOVE(conn, transport, endpoint); - return false; - } -} - -static void pn_session_finalize(void *object) -{ - pn_session_t *session = (pn_session_t *) object; - pn_endpoint_t *endpoint = &session->endpoint; - - if (pni_preserve_child(endpoint)) { - return; - } - - pn_free(session->context); - pni_free_children(session->links, session->freed); - pni_endpoint_tini(endpoint); - pn_delivery_map_free(&session->state.incoming); - pn_delivery_map_free(&session->state.outgoing); - pn_free(session->state.local_handles); - pn_free(session->state.remote_handles); - pni_remove_session(session->connection, session); - pn_list_remove(session->connection->freed, session); - - if (session->connection->transport) { - pn_transport_t *transport = session->connection->transport; - pn_hash_del(transport->local_channels, session->state.local_channel); - pn_hash_del(transport->remote_channels, session->state.remote_channel); - } - - if (endpoint->referenced) { - pn_decref(session->connection); - } -} - -#define pn_session_new pn_object_new -#define pn_session_refcount pn_object_refcount -#define pn_session_decref pn_object_decref -#define pn_session_reify pn_object_reify -#define pn_session_initialize NULL -#define pn_session_hashcode NULL -#define pn_session_compare NULL -#define pn_session_inspect NULL - -pn_session_t *pn_session(pn_connection_t *conn) -{ - assert(conn); - - - pn_transport_t * transport = pn_connection_transport(conn); - - if(transport) { - // channel_max is an index, not a count. - if(pn_hash_size(transport->local_channels) > (size_t)transport->channel_max) { - pn_transport_logf(transport, - "pn_session: too many sessions: %d channel_max is %d", - pn_hash_size(transport->local_channels), - transport->channel_max); - return (pn_session_t *) 0; - } - } - -#define pn_session_free pn_object_free - static const pn_class_t clazz = PN_METACLASS(pn_session); -#undef pn_session_free - pn_session_t *ssn = (pn_session_t *) pn_class_new(&clazz, sizeof(pn_session_t)); - if (!ssn) return NULL; - pn_endpoint_init(&ssn->endpoint, SESSION, conn); - pni_add_session(conn, ssn); - ssn->links = pn_list(PN_WEAKREF, 0); - ssn->freed = pn_list(PN_WEAKREF, 0); - ssn->context = pn_record(); - ssn->incoming_capacity = 1024*1024; - ssn->incoming_bytes = 0; - ssn->outgoing_bytes = 0; - ssn->incoming_deliveries = 0; - ssn->outgoing_deliveries = 0; - ssn->outgoing_window = 2147483647; - - // begin transport state - memset(&ssn->state, 0, sizeof(ssn->state)); - ssn->state.local_channel = (uint16_t)-1; - ssn->state.remote_channel = (uint16_t)-1; - pn_delivery_map_init(&ssn->state.incoming, 0); - pn_delivery_map_init(&ssn->state.outgoing, 0); - ssn->state.local_handles = pn_hash(PN_WEAKREF, 0, 0.75); - ssn->state.remote_handles = pn_hash(PN_WEAKREF, 0, 0.75); - // end transport state - - pn_collector_put(conn->collector, PN_OBJECT, ssn, PN_SESSION_INIT); - if (conn->transport) { - pni_session_bound(ssn); - } - pn_decref(ssn); - return ssn; -} - -static void pni_session_bound(pn_session_t *ssn) -{ - assert(ssn); - size_t nlinks = pn_list_size(ssn->links); - for (size_t i = 0; i < nlinks; i++) { - pni_link_bound((pn_link_t *) pn_list_get(ssn->links, i)); - } -} - -void pn_session_unbound(pn_session_t* ssn) -{ - assert(ssn); - ssn->state.local_channel = (uint16_t)-1; - ssn->state.remote_channel = (uint16_t)-1; - ssn->incoming_bytes = 0; - ssn->outgoing_bytes = 0; - ssn->incoming_deliveries = 0; - ssn->outgoing_deliveries = 0; -} - -size_t pn_session_get_incoming_capacity(pn_session_t *ssn) -{ - assert(ssn); - return ssn->incoming_capacity; -} - -void pn_session_set_incoming_capacity(pn_session_t *ssn, size_t capacity) -{ - assert(ssn); - // XXX: should this trigger a flow? - ssn->incoming_capacity = capacity; -} - -size_t pn_session_get_outgoing_window(pn_session_t *ssn) -{ - assert(ssn); - return ssn->outgoing_window; -} - -void pn_session_set_outgoing_window(pn_session_t *ssn, size_t window) -{ - assert(ssn); - ssn->outgoing_window = window; -} - -size_t pn_session_outgoing_bytes(pn_session_t *ssn) -{ - assert(ssn); - return ssn->outgoing_bytes; -} - -size_t pn_session_incoming_bytes(pn_session_t *ssn) -{ - assert(ssn); - return ssn->incoming_bytes; -} - -pn_state_t pn_session_state(pn_session_t *session) -{ - return session->endpoint.state; -} - -pn_error_t *pn_session_error(pn_session_t *session) -{ - return session->endpoint.error; -} - -static void pni_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type) -{ - terminus->type = type; - terminus->address = pn_string(NULL); - terminus->durability = PN_NONDURABLE; - terminus->expiry_policy = PN_EXPIRE_WITH_SESSION; - terminus->timeout = 0; - terminus->dynamic = false; - terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED; - terminus->properties = pn_data(0); - terminus->capabilities = pn_data(0); - terminus->outcomes = pn_data(0); - terminus->filter = pn_data(0); -} - -static void pn_link_incref(void *object) -{ - pn_link_t *link = (pn_link_t *) object; - if (!link->endpoint.referenced) { - link->endpoint.referenced = true; - pn_incref(link->session); - } else { - pn_object_incref(object); - } -} - -static void pn_link_finalize(void *object) -{ - pn_link_t *link = (pn_link_t *) object; - pn_endpoint_t *endpoint = &link->endpoint; - - if (pni_preserve_child(endpoint)) { - return; - } - - while (link->unsettled_head) { - assert(!link->unsettled_head->referenced); - pn_free(link->unsettled_head); - } - - pn_free(link->context); - pni_terminus_free(&link->source); - pni_terminus_free(&link->target); - pni_terminus_free(&link->remote_source); - pni_terminus_free(&link->remote_target); - pn_free(link->name); - pni_endpoint_tini(endpoint); - pni_remove_link(link->session, link); - pn_hash_del(link->session->state.local_handles, link->state.local_handle); - pn_hash_del(link->session->state.remote_handles, link->state.remote_handle); - pn_list_remove(link->session->freed, link); - if (endpoint->referenced) { - pn_decref(link->session); - } -} - -#define pn_link_refcount pn_object_refcount -#define pn_link_decref pn_object_decref -#define pn_link_reify pn_object_reify -#define pn_link_initialize NULL -#define pn_link_hashcode NULL -#define pn_link_compare NULL -#define pn_link_inspect NULL - -pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) -{ -#define pn_link_new pn_object_new -#define pn_link_free pn_object_free - static const pn_class_t clazz = PN_METACLASS(pn_link); -#undef pn_link_new -#undef pn_link_free - pn_link_t *link = (pn_link_t *) pn_class_new(&clazz, sizeof(pn_link_t)); - - pn_endpoint_init(&link->endpoint, type, session->connection); - pni_add_link(session, link); - pn_incref(session); // keep session until link finalized - link->name = pn_string(name); - pni_terminus_init(&link->source, PN_SOURCE); - pni_terminus_init(&link->target, PN_TARGET); - pni_terminus_init(&link->remote_source, PN_UNSPECIFIED); - pni_terminus_init(&link->remote_target, PN_UNSPECIFIED); - link->unsettled_head = link->unsettled_tail = link->current = NULL; - link->unsettled_count = 0; - link->max_message_size = 0; - link->remote_max_message_size = 0; - link->available = 0; - link->credit = 0; - link->queued = 0; - link->drain = false; - link->drain_flag_mode = true; - link->drained = 0; - link->context = pn_record(); - link->snd_settle_mode = PN_SND_MIXED; - link->rcv_settle_mode = PN_RCV_FIRST; - link->remote_snd_settle_mode = PN_SND_MIXED; - link->remote_rcv_settle_mode = PN_RCV_FIRST; - link->detached = false; - - // begin transport state - link->state.local_handle = -1; - link->state.remote_handle = -1; - link->state.delivery_count = 0; - link->state.link_credit = 0; - // end transport state - - pn_collector_put(session->connection->collector, PN_OBJECT, link, PN_LINK_INIT); - if (session->connection->transport) { - pni_link_bound(link); - } - pn_decref(link); - return link; -} - -static void pni_link_bound(pn_link_t *link) -{ -} - -void pn_link_unbound(pn_link_t* link) -{ - assert(link); - link->state.local_handle = -1; - link->state.remote_handle = -1; - link->state.delivery_count = 0; - link->state.link_credit = 0; -} - -pn_terminus_t *pn_link_source(pn_link_t *link) -{ - return link ? &link->source : NULL; -} - -pn_terminus_t *pn_link_target(pn_link_t *link) -{ - return link ? &link->target : NULL; -} - -pn_terminus_t *pn_link_remote_source(pn_link_t *link) -{ - return link ? &link->remote_source : NULL; -} - -pn_terminus_t *pn_link_remote_target(pn_link_t *link) -{ - return link ? &link->remote_target : NULL; -} - -int pn_terminus_set_type(pn_terminus_t *terminus, pn_terminus_type_t type) -{ - if (!terminus) return PN_ARG_ERR; - terminus->type = type; - return 0; -} - -pn_terminus_type_t pn_terminus_get_type(pn_terminus_t *terminus) -{ - return terminus ? terminus->type : (pn_terminus_type_t) 0; -} - -const char *pn_terminus_get_address(pn_terminus_t *terminus) -{ - assert(terminus); - return pn_string_get(terminus->address); -} - -int pn_terminus_set_address(pn_terminus_t *terminus, const char *address) -{ - assert(terminus); - return pn_string_set(terminus->address, address); -} - -pn_durability_t pn_terminus_get_durability(pn_terminus_t *terminus) -{ - return terminus ? terminus->durability : (pn_durability_t) 0; -} - -int pn_terminus_set_durability(pn_terminus_t *terminus, pn_durability_t durability) -{ - if (!terminus) return PN_ARG_ERR; - terminus->durability = durability; - return 0; -} - -pn_expiry_policy_t pn_terminus_get_expiry_policy(pn_terminus_t *terminus) -{ - return terminus ? terminus->expiry_policy : (pn_expiry_policy_t) 0; -} - -int pn_terminus_set_expiry_policy(pn_terminus_t *terminus, pn_expiry_policy_t expiry_policy) -{ - if (!terminus) return PN_ARG_ERR; - terminus->expiry_policy = expiry_policy; - return 0; -} - -pn_seconds_t pn_terminus_get_timeout(pn_terminus_t *terminus) -{ - return terminus ? terminus->timeout : 0; -} - -int pn_terminus_set_timeout(pn_terminus_t *terminus, pn_seconds_t timeout) -{ - if (!terminus) return PN_ARG_ERR; - terminus->timeout = timeout; - return 0; -} - -bool pn_terminus_is_dynamic(pn_terminus_t *terminus) -{ - return terminus ? terminus->dynamic : false; -} - -int pn_terminus_set_dynamic(pn_terminus_t *terminus, bool dynamic) -{ - if (!terminus) return PN_ARG_ERR; - terminus->dynamic = dynamic; - return 0; -} - -pn_data_t *pn_terminus_properties(pn_terminus_t *terminus) -{ - return terminus ? terminus->properties : NULL; -} - -pn_data_t *pn_terminus_capabilities(pn_terminus_t *terminus) -{ - return terminus ? terminus->capabilities : NULL; -} - -pn_data_t *pn_terminus_outcomes(pn_terminus_t *terminus) -{ - return terminus ? terminus->outcomes : NULL; -} - -pn_data_t *pn_terminus_filter(pn_terminus_t *terminus) -{ - return terminus ? terminus->filter : NULL; -} - -pn_distribution_mode_t pn_terminus_get_distribution_mode(const pn_terminus_t *terminus) -{ - return terminus ? terminus->distribution_mode : PN_DIST_MODE_UNSPECIFIED; -} - -int pn_terminus_set_distribution_mode(pn_terminus_t *terminus, pn_distribution_mode_t m) -{ - if (!terminus) return PN_ARG_ERR; - terminus->distribution_mode = m; - return 0; -} - -int pn_terminus_copy(pn_terminus_t *terminus, pn_terminus_t *src) -{ - if (!terminus || !src) { - return PN_ARG_ERR; - } - - terminus->type = src->type; - int err = pn_terminus_set_address(terminus, pn_terminus_get_address(src)); - if (err) return err; - terminus->durability = src->durability; - terminus->expiry_policy = src->expiry_policy; - terminus->timeout = src->timeout; - terminus->dynamic = src->dynamic; - terminus->distribution_mode = src->distribution_mode; - err = pn_data_copy(terminus->properties, src->properties); - if (err) return err; - err = pn_data_copy(terminus->capabilities, src->capabilities); - if (err) return err; - err = pn_data_copy(terminus->outcomes, src->outcomes); - if (err) return err; - err = pn_data_copy(terminus->filter, src->filter); - if (err) return err; - return 0; -} - -pn_link_t *pn_sender(pn_session_t *session, const char *name) -{ - return pn_link_new(SENDER, session, name); -} - -pn_link_t *pn_receiver(pn_session_t *session, const char *name) -{ - return pn_link_new(RECEIVER, session, name); -} - -pn_state_t pn_link_state(pn_link_t *link) -{ - return link->endpoint.state; -} - -pn_error_t *pn_link_error(pn_link_t *link) -{ - return link->endpoint.error; -} - -const char *pn_link_name(pn_link_t *link) -{ - assert(link); - return pn_string_get(link->name); -} - -bool pn_link_is_sender(pn_link_t *link) -{ - return link->endpoint.type == SENDER; -} - -bool pn_link_is_receiver(pn_link_t *link) -{ - return link->endpoint.type == RECEIVER; -} - -pn_session_t *pn_link_session(pn_link_t *link) -{ - assert(link); - return link->session; -} - -static void pn_disposition_finalize(pn_disposition_t *ds) -{ - pn_free(ds->data); - pn_free(ds->annotations); - pn_condition_tini(&ds->condition); -} - -static void pn_delivery_incref(void *object) -{ - pn_delivery_t *delivery = (pn_delivery_t *) object; - if (delivery->link && !delivery->referenced) { - delivery->referenced = true; - pn_incref(delivery->link); - } else { - pn_object_incref(object); - } -} - -static bool pni_preserve_delivery(pn_delivery_t *delivery) -{ - pn_connection_t *conn = delivery->link->session->connection; - return !delivery->local.settled || (conn->transport && (delivery->state.init || delivery->tpwork)); -} - -static void pn_delivery_finalize(void *object) -{ - pn_delivery_t *delivery = (pn_delivery_t *) object; - pn_link_t *link = delivery->link; - // assert(!delivery->state.init); - - bool pooled = false; - bool referenced = true; - if (link) { - if (pni_link_live(link) && pni_preserve_delivery(delivery) && delivery->referenced) { - delivery->referenced = false; - pn_object_incref(delivery); - pn_decref(link); - return; - } - referenced = delivery->referenced; - - pn_clear_tpwork(delivery); - LL_REMOVE(link, unsettled, delivery); - pn_delivery_map_del(pn_link_is_sender(link) - ? &link->session->state.outgoing - : &link->session->state.incoming, - delivery); - pn_buffer_clear(delivery->tag); - pn_buffer_clear(delivery->bytes); - pn_record_clear(delivery->context); - delivery->settled = true; - pn_connection_t *conn = link->session->connection; - assert(pn_refcount(delivery) == 0); - if (pni_connection_live(conn)) { - pn_list_t *pool = link->session->connection->delivery_pool; - delivery->link = NULL; - pn_list_add(pool, delivery); - pooled = true; - assert(pn_refcount(delivery) == 1); - } - } - - if (!pooled) { - pn_free(delivery->context); - pn_buffer_free(delivery->tag); - pn_buffer_free(delivery->bytes); - pn_disposition_finalize(&delivery->local); - pn_disposition_finalize(&delivery->remote); - } - - if (referenced) { - pn_decref(link); - } -} - -static void pn_disposition_init(pn_disposition_t *ds) -{ - ds->data = pn_data(0); - ds->annotations = pn_data(0); - pn_condition_init(&ds->condition); -} - -static void pn_disposition_clear(pn_disposition_t *ds) -{ - ds->type = 0; - ds->section_number = 0; - ds->section_offset = 0; - ds->failed = false; - ds->undeliverable = false; - ds->settled = false; - pn_data_clear(ds->data); - pn_data_clear(ds->annotations); - pn_condition_clear(&ds->condition); -} - -#define pn_delivery_new pn_object_new -#define pn_delivery_refcount pn_object_refcount -#define pn_delivery_decref pn_object_decref -#define pn_delivery_free pn_object_free -#define pn_delivery_reify pn_object_reify -#define pn_delivery_initialize NULL -#define pn_delivery_hashcode NULL -#define pn_delivery_compare NULL -#define pn_delivery_inspect NULL - -pn_delivery_tag_t pn_dtag(const char *bytes, size_t size) { - pn_delivery_tag_t dtag = {size, bytes}; - return dtag; -} - -pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) -{ - assert(link); - pn_list_t *pool = link->session->connection->delivery_pool; - pn_delivery_t *delivery = (pn_delivery_t *) pn_list_pop(pool); - if (!delivery) { - static const pn_class_t clazz = PN_METACLASS(pn_delivery); - delivery = (pn_delivery_t *) pn_class_new(&clazz, sizeof(pn_delivery_t)); - if (!delivery) return NULL; - delivery->tag = pn_buffer(16); - delivery->bytes = pn_buffer(64); - pn_disposition_init(&delivery->local); - pn_disposition_init(&delivery->remote); - delivery->context = pn_record(); - } else { - assert(!delivery->state.init); - } - delivery->link = link; - pn_incref(delivery->link); // keep link until finalized - pn_buffer_clear(delivery->tag); - pn_buffer_append(delivery->tag, tag.start, tag.size); - pn_disposition_clear(&delivery->local); - pn_disposition_clear(&delivery->remote); - delivery->updated = false; - delivery->settled = false; - LL_ADD(link, unsettled, delivery); - delivery->referenced = true; - delivery->work_next = NULL; - delivery->work_prev = NULL; - delivery->work = false; - delivery->tpwork_next = NULL; - delivery->tpwork_prev = NULL; - delivery->tpwork = false; - pn_buffer_clear(delivery->bytes); - delivery->done = false; - pn_record_clear(delivery->context); - - // begin delivery state - delivery->state.init = false; - delivery->state.sent = false; - // end delivery state - - if (!link->current) - link->current = delivery; - - link->unsettled_count++; - - pn_work_update(link->session->connection, delivery); - - // XXX: could just remove incref above - pn_decref(delivery); - - return delivery; -} - -bool pn_delivery_buffered(pn_delivery_t *delivery) -{ - assert(delivery); - if (delivery->settled) return false; - if (pn_link_is_sender(delivery->link)) { - pn_delivery_state_t *state = &delivery->state; - if (state->sent) { - return false; - } else { - return delivery->done || (pn_buffer_size(delivery->bytes) > 0); - } - } else { - return false; - } -} - -int pn_link_unsettled(pn_link_t *link) -{ - return link->unsettled_count; -} - -pn_delivery_t *pn_unsettled_head(pn_link_t *link) -{ - pn_delivery_t *d = link->unsettled_head; - while (d && d->local.settled) { - d = d->unsettled_next; - } - return d; -} - -pn_delivery_t *pn_unsettled_next(pn_delivery_t *delivery) -{ - pn_delivery_t *d = delivery->unsettled_next; - while (d && d->local.settled) { - d = d->unsettled_next; - } - return d; -} - -bool pn_delivery_current(pn_delivery_t *delivery) -{ - pn_link_t *link = delivery->link; - return pn_link_current(link) == delivery; -} - -void pn_delivery_dump(pn_delivery_t *d) -{ - char tag[1024]; - pn_bytes_t bytes = pn_buffer_bytes(d->tag); - pn_quote_data(tag, 1024, bytes.start, bytes.size); - printf("{tag=%s, local.type=%" PRIu64 ", remote.type=%" PRIu64 ", local.settled=%u, " - "remote.settled=%u, updated=%u, current=%u, writable=%u, readable=%u, " - "work=%u}", - tag, d->local.type, d->remote.type, d->local.settled, - d->remote.settled, d->updated, pn_delivery_current(d), - pn_delivery_writable(d), pn_delivery_readable(d), d->work); -} - -void *pn_delivery_get_context(pn_delivery_t *delivery) -{ - assert(delivery); - return pn_record_get(delivery->context, PN_LEGCTX); -} - -void pn_delivery_set_context(pn_delivery_t *delivery, void *context) -{ - assert(delivery); - pn_record_set(delivery->context, PN_LEGCTX, context); -} - -pn_record_t *pn_delivery_attachments(pn_delivery_t *delivery) -{ - assert(delivery); - return delivery->context; -} - -uint64_t pn_disposition_type(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->type; -} - -pn_data_t *pn_disposition_data(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->data; -} - -uint32_t pn_disposition_get_section_number(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->section_number; -} - -void pn_disposition_set_section_number(pn_disposition_t *disposition, uint32_t section_number) -{ - assert(disposition); - disposition->section_number = section_number; -} - -uint64_t pn_disposition_get_section_offset(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->section_offset; -} - -void pn_disposition_set_section_offset(pn_disposition_t *disposition, uint64_t section_offset) -{ - assert(disposition); - disposition->section_offset = section_offset; -} - -bool pn_disposition_is_failed(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->failed; -} - -void pn_disposition_set_failed(pn_disposition_t *disposition, bool failed) -{ - assert(disposition); - disposition->failed = failed; -} - -bool pn_disposition_is_undeliverable(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->undeliverable; -} - -void pn_disposition_set_undeliverable(pn_disposition_t *disposition, bool undeliverable) -{ - assert(disposition); - disposition->undeliverable = undeliverable; -} - -pn_data_t *pn_disposition_annotations(pn_disposition_t *disposition) -{ - assert(disposition); - return disposition->annotations; -} - -pn_condition_t *pn_disposition_condition(pn_disposition_t *disposition) -{ - assert(disposition); - return &disposition->condition; -} - -pn_delivery_tag_t pn_delivery_tag(pn_delivery_t *delivery) -{ - if (delivery) { - pn_bytes_t tag = pn_buffer_bytes(delivery->tag); - return pn_dtag(tag.start, tag.size); - } else { - return pn_dtag(0, 0); - } -} - -pn_delivery_t *pn_link_current(pn_link_t *link) -{ - if (!link) return NULL; - return link->current; -} - -static void pni_advance_sender(pn_link_t *link) -{ - link->current->done = true; - link->queued++; - link->credit--; - link->session->outgoing_deliveries++; - pni_add_tpwork(link->current); - link->current = link->current->unsettled_next; -} - -static void pni_advance_receiver(pn_link_t *link) -{ - link->credit--; - link->queued--; - link->session->incoming_deliveries--; - - pn_delivery_t *current = link->current; - link->session->incoming_bytes -= pn_buffer_size(current->bytes); - pn_buffer_clear(current->bytes); - - if (!link->session->state.incoming_window) { - pni_add_tpwork(current); - } - - link->current = link->current->unsettled_next; -} - -bool pn_link_advance(pn_link_t *link) -{ - if (link && link->current) { - pn_delivery_t *prev = link->current; - if (link->endpoint.type == SENDER) { - pni_advance_sender(link); - } else { - pni_advance_receiver(link); - } - pn_delivery_t *next = link->current; - pn_work_update(link->session->connection, prev); - if (next) pn_work_update(link->session->connection, next); - return prev != next; - } else { - return false; - } -} - -int pn_link_credit(pn_link_t *link) -{ - return link ? link->credit : 0; -} - -int pn_link_available(pn_link_t *link) -{ - return link ? link->available : 0; -} - -int pn_link_queued(pn_link_t *link) -{ - return link ? link->queued : 0; -} - -int pn_link_remote_credit(pn_link_t *link) -{ - assert(link); - return link->credit - link->queued; -} - -bool pn_link_get_drain(pn_link_t *link) -{ - assert(link); - return link->drain; -} - -pn_snd_settle_mode_t pn_link_snd_settle_mode(pn_link_t *link) -{ - return link ? (pn_snd_settle_mode_t)link->snd_settle_mode - : PN_SND_MIXED; -} - -pn_rcv_settle_mode_t pn_link_rcv_settle_mode(pn_link_t *link) -{ - return link ? (pn_rcv_settle_mode_t)link->rcv_settle_mode - : PN_RCV_FIRST; -} - -pn_snd_settle_mode_t pn_link_remote_snd_settle_mode(pn_link_t *link) -{ - return link ? (pn_snd_settle_mode_t)link->remote_snd_settle_mode - : PN_SND_MIXED; -} - -pn_rcv_settle_mode_t pn_link_remote_rcv_settle_mode(pn_link_t *link) -{ - return link ? (pn_rcv_settle_mode_t)link->remote_rcv_settle_mode - : PN_RCV_FIRST; -} -void pn_link_set_snd_settle_mode(pn_link_t *link, pn_snd_settle_mode_t mode) -{ - if (link) - link->snd_settle_mode = (uint8_t)mode; -} -void pn_link_set_rcv_settle_mode(pn_link_t *link, pn_rcv_settle_mode_t mode) -{ - if (link) - link->rcv_settle_mode = (uint8_t)mode; -} - -void pn_delivery_settle(pn_delivery_t *delivery) -{ - assert(delivery); - if (!delivery->local.settled) { - pn_link_t *link = delivery->link; - if (pn_delivery_current(delivery)) { - pn_link_advance(link); - } - - link->unsettled_count--; - delivery->local.settled = true; - pni_add_tpwork(delivery); - pn_work_update(delivery->link->session->connection, delivery); - pn_incref(delivery); - pn_decref(delivery); - } -} - -void pn_link_offered(pn_link_t *sender, int credit) -{ - sender->available = credit; -} - -ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n) -{ - pn_delivery_t *current = pn_link_current(sender); - if (!current) return PN_EOS; - if (!bytes || !n) return 0; - pn_buffer_append(current->bytes, bytes, n); - sender->session->outgoing_bytes += n; - pni_add_tpwork(current); - return n; -} - -int pn_link_drained(pn_link_t *link) -{ - assert(link); - int drained = 0; - - if (pn_link_is_sender(link)) { - if (link->drain && link->credit > 0) { - link->drained = link->credit; - link->credit = 0; - pn_modified(link->session->connection, &link->endpoint, true); - drained = link->drained; - } - } else { - drained = link->drained; - link->drained = 0; - } - - return drained; -} - -ssize_t pn_link_recv(pn_link_t *receiver, char *bytes, size_t n) -{ - if (!receiver) return PN_ARG_ERR; - - pn_delivery_t *delivery = receiver->current; - if (delivery) { - size_t size = pn_buffer_get(delivery->bytes, 0, n, bytes); - pn_buffer_trim(delivery->bytes, size, 0); - if (size) { - receiver->session->incoming_bytes -= size; - if (!receiver->session->state.incoming_window) { - pni_add_tpwork(delivery); - } - return size; - } else { - return delivery->done ? PN_EOS : 0; - } - } else { - return PN_STATE_ERR; - } -} - -void pn_link_flow(pn_link_t *receiver, int credit) -{ - assert(receiver); - assert(pn_link_is_receiver(receiver)); - receiver->credit += credit; - pn_modified(receiver->session->connection, &receiver->endpoint, true); - if (!receiver->drain_flag_mode) { - pn_link_set_drain(receiver, false); - receiver->drain_flag_mode = false; - } -} - -void pn_link_drain(pn_link_t *receiver, int credit) -{ - assert(receiver); - assert(pn_link_is_receiver(receiver)); - pn_link_set_drain(receiver, true); - pn_link_flow(receiver, credit); - receiver->drain_flag_mode = false; -} - -void pn_link_set_drain(pn_link_t *receiver, bool drain) -{ - assert(receiver); - assert(pn_link_is_receiver(receiver)); - receiver->drain = drain; - pn_modified(receiver->session->connection, &receiver->endpoint, true); - receiver->drain_flag_mode = true; -} - -bool pn_link_draining(pn_link_t *receiver) -{ - assert(receiver); - assert(pn_link_is_receiver(receiver)); - return receiver->drain && (pn_link_credit(receiver) > pn_link_queued(receiver)); -} - -uint64_t pn_link_max_message_size(pn_link_t *link) -{ - return link->max_message_size; -} - -void pn_link_set_max_message_size(pn_link_t *link, uint64_t size) -{ - link->max_message_size = size; -} - -uint64_t pn_link_remote_max_message_size(pn_link_t *link) -{ - return link->remote_max_message_size; -} - -pn_link_t *pn_delivery_link(pn_delivery_t *delivery) -{ - assert(delivery); - return delivery->link; -} - -pn_disposition_t *pn_delivery_local(pn_delivery_t *delivery) -{ - assert(delivery); - return &delivery->local; -} - -uint64_t pn_delivery_local_state(pn_delivery_t *delivery) -{ - assert(delivery); - return delivery->local.type; -} - -pn_disposition_t *pn_delivery_remote(pn_delivery_t *delivery) -{ - assert(delivery); - return &delivery->remote; -} - -uint64_t pn_delivery_remote_state(pn_delivery_t *delivery) -{ - assert(delivery); - return delivery->remote.type; -} - -bool pn_delivery_settled(pn_delivery_t *delivery) -{ - return delivery ? delivery->remote.settled : false; -} - -bool pn_delivery_updated(pn_delivery_t *delivery) -{ - return delivery ? delivery->updated : false; -} - -void pn_delivery_clear(pn_delivery_t *delivery) -{ - delivery->updated = false; - pn_work_update(delivery->link->session->connection, delivery); -} - -void pn_delivery_update(pn_delivery_t *delivery, uint64_t state) -{ - if (!delivery) return; - delivery->local.type = state; - pni_add_tpwork(delivery); -} - -bool pn_delivery_writable(pn_delivery_t *delivery) -{ - if (!delivery) return false; - - pn_link_t *link = delivery->link; - return pn_link_is_sender(link) && pn_delivery_current(delivery) && pn_link_credit(link) > 0; -} - -bool pn_delivery_readable(pn_delivery_t *delivery) -{ - if (delivery) { - pn_link_t *link = delivery->link; - return pn_link_is_receiver(link) && pn_delivery_current(delivery); - } else { - return false; - } -} - -size_t pn_delivery_pending(pn_delivery_t *delivery) -{ - return pn_buffer_size(delivery->bytes); -} - -bool pn_delivery_partial(pn_delivery_t *delivery) -{ - return !delivery->done; -} - -pn_condition_t *pn_connection_condition(pn_connection_t *connection) -{ - assert(connection); - return &connection->endpoint.condition; -} - -pn_condition_t *pn_connection_remote_condition(pn_connection_t *connection) -{ - assert(connection); - pn_transport_t *transport = connection->transport; - return transport ? &transport->remote_condition : NULL; -} - -pn_condition_t *pn_session_condition(pn_session_t *session) -{ - assert(session); - return &session->endpoint.condition; -} - -pn_condition_t *pn_session_remote_condition(pn_session_t *session) -{ - assert(session); - return &session->endpoint.remote_condition; -} - -pn_condition_t *pn_link_condition(pn_link_t *link) -{ - assert(link); - return &link->endpoint.condition; -} - -pn_condition_t *pn_link_remote_condition(pn_link_t *link) -{ - assert(link); - return &link->endpoint.remote_condition; -} - -bool pn_condition_is_set(pn_condition_t *condition) -{ - return condition && pn_string_get(condition->name); -} - -void pn_condition_clear(pn_condition_t *condition) -{ - assert(condition); - pn_string_clear(condition->name); - pn_string_clear(condition->description); - pn_data_clear(condition->info); -} - -const char *pn_condition_get_name(pn_condition_t *condition) -{ - assert(condition); - return pn_string_get(condition->name); -} - -int pn_condition_set_name(pn_condition_t *condition, const char *name) -{ - assert(condition); - return pn_string_set(condition->name, name); -} - -const char *pn_condition_get_description(pn_condition_t *condition) -{ - assert(condition); - return pn_string_get(condition->description); -} - -int pn_condition_set_description(pn_condition_t *condition, const char *description) -{ - assert(condition); - return pn_string_set(condition->description, description); -} - -int pn_condition_vformat(pn_condition_t *condition, const char *name, const char *fmt, va_list ap) -{ - assert(condition); - int err = pn_condition_set_name(condition, name); - if (err) - return err; - - char text[1024]; - size_t n = pni_vsnprintf(text, 1024, fmt, ap); - if (n >= sizeof(text)) - text[sizeof(text)-1] = '\0'; - err = pn_condition_set_description(condition, text); - return err; -} - -int pn_condition_format(pn_condition_t *condition, const char *name, const char *fmt, ...) -{ - assert(condition); - va_list ap; - va_start(ap, fmt); - int err = pn_condition_vformat(condition, name, fmt, ap); - va_end(ap); - return err; -} - -pn_data_t *pn_condition_info(pn_condition_t *condition) -{ - assert(condition); - return condition->info; -} - -bool pn_condition_is_redirect(pn_condition_t *condition) -{ - const char *name = pn_condition_get_name(condition); - return name && (!strcmp(name, "amqp:connection:redirect") || - !strcmp(name, "amqp:link:redirect")); -} - -const char *pn_condition_redirect_host(pn_condition_t *condition) -{ - pn_data_t *data = pn_condition_info(condition); - pn_data_rewind(data); - pn_data_next(data); - pn_data_enter(data); - pn_data_lookup(data, "network-host"); - pn_bytes_t host = pn_data_get_bytes(data); - pn_data_rewind(data); - return host.start; -} - -int pn_condition_redirect_port(pn_condition_t *condition) -{ - pn_data_t *data = pn_condition_info(condition); - pn_data_rewind(data); - pn_data_next(data); - pn_data_enter(data); - pn_data_lookup(data, "port"); - int port = pn_data_get_int(data); - pn_data_rewind(data); - return port; -} - -pn_connection_t *pn_event_connection(pn_event_t *event) -{ - pn_session_t *ssn; - pn_transport_t *transport; - - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_connection: - return (pn_connection_t *) pn_event_context(event); - case CID_pn_transport: - transport = pn_event_transport(event); - if (transport) - return transport->connection; - return NULL; - default: - ssn = pn_event_session(event); - if (ssn) - return pn_session_connection(ssn); - } - return NULL; -} - -pn_session_t *pn_event_session(pn_event_t *event) -{ - pn_link_t *link; - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_session: - return (pn_session_t *) pn_event_context(event); - default: - link = pn_event_link(event); - if (link) - return pn_link_session(link); - } - return NULL; -} - -pn_link_t *pn_event_link(pn_event_t *event) -{ - pn_delivery_t *dlv; - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_link: - return (pn_link_t *) pn_event_context(event); - default: - dlv = pn_event_delivery(event); - if (dlv) - return pn_delivery_link(dlv); - } - return NULL; -} - -pn_delivery_t *pn_event_delivery(pn_event_t *event) -{ - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_delivery: - return (pn_delivery_t *) pn_event_context(event); - default: - return NULL; - } -} - -pn_transport_t *pn_event_transport(pn_event_t *event) -{ - switch (pn_class_id(pn_event_class(event))) { - case CID_pn_transport: - return (pn_transport_t *) pn_event_context(event); - default: - { - pn_connection_t *conn = pn_event_connection(event); - if (conn) - return pn_connection_transport(conn); - return NULL; - } - } -} - -int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src) { - assert(dest); - assert(src); - int err = 0; - if (src != dest) { - int err = pn_string_copy(dest->name, src->name); - if (!err) err = pn_string_copy(dest->description, src->description); - if (!err) err = pn_data_copy(dest->info, src->info); - } - return err; -}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/error.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/error.c b/proton-c/src/core/error.c deleted file mode 100644 index 70d36fa..0000000 --- a/proton-c/src/core/error.c +++ /dev/null @@ -1,136 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "platform/platform.h" -#include "util.h" - -#include <proton/error.h> -#include <stdlib.h> -#include <string.h> -#include <assert.h> - -struct pn_error_t { - char *text; - pn_error_t *root; - int code; -}; - -pn_error_t *pn_error() -{ - pn_error_t *error = (pn_error_t *) malloc(sizeof(pn_error_t)); - if (error != NULL) { - error->code = 0; - error->text = NULL; - error->root = NULL; - } - return error; -} - -void pn_error_free(pn_error_t *error) -{ - if (error) { - free(error->text); - free(error); - } -} - -void pn_error_clear(pn_error_t *error) -{ - if (error) { - error->code = 0; - free(error->text); - error->text = NULL; - error->root = NULL; - } -} - -int pn_error_set(pn_error_t *error, int code, const char *text) -{ - assert(error); - pn_error_clear(error); - if (code) { - error->code = code; - error->text = pn_strdup(text); - } - return code; -} - -int pn_error_vformat(pn_error_t *error, int code, const char *fmt, va_list ap) -{ - assert(error); - char text[1024]; - int n = pni_vsnprintf(text, 1024, fmt, ap); - if (n >= 1024) { - text[1023] = '\0'; - } - return pn_error_set(error, code, text); -} - -int pn_error_format(pn_error_t *error, int code, const char *fmt, ...) -{ - assert(error); - va_list ap; - va_start(ap, fmt); - int rcode = pn_error_vformat(error, code, fmt, ap); - va_end(ap); - return rcode; -} - -int pn_error_code(pn_error_t *error) -{ - assert(error); - return error->code; -} - -const char *pn_error_text(pn_error_t *error) -{ - assert(error); - return error->text; -} - -int pn_error_copy(pn_error_t *error, pn_error_t *src) -{ - assert(error); - if (src) { - return pn_error_set(error, pn_error_code(src), pn_error_text(src)); - } else { - pn_error_clear(error); - return 0; - } -} - -const char *pn_code(int code) -{ - switch (code) - { - case 0: return "<ok>"; - case PN_EOS: return "PN_EOS"; - case PN_ERR: return "PN_ERR"; - case PN_OVERFLOW: return "PN_OVERFLOW"; - case PN_UNDERFLOW: return "PN_UNDERFLOW"; - case PN_STATE_ERR: return "PN_STATE_ERR"; - case PN_ARG_ERR: return "PN_ARG_ERR"; - case PN_TIMEOUT: return "PN_TIMEOUT"; - case PN_INTR: return "PN_INTR"; - case PN_OUT_OF_MEMORY: return "PN_OUT_OF_MEMORY"; - default: return "<unknown>"; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/event.c b/proton-c/src/core/event.c deleted file mode 100644 index 2a0a5cf..0000000 --- a/proton-c/src/core/event.c +++ /dev/null @@ -1,404 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include <stdio.h> -#include <proton/object.h> -#include <proton/event.h> -#include <proton/reactor.h> -#include <assert.h> - -struct pn_collector_t { - pn_list_t *pool; - pn_event_t *head; - pn_event_t *tail; - bool freed:1; - bool head_returned:1; /* Head has been returned by pn_collector_next() */ -}; - -struct pn_event_t { - pn_list_t *pool; - const pn_class_t *clazz; - void *context; // depends on clazz - pn_record_t *attachments; - pn_event_t *next; - pn_event_type_t type; -}; - -static void pn_collector_initialize(pn_collector_t *collector) -{ - collector->pool = pn_list(PN_OBJECT, 0); - collector->head = NULL; - collector->tail = NULL; - collector->freed = false; -} - -static void pn_collector_drain(pn_collector_t *collector) -{ - assert(collector); - while (pn_collector_next(collector)) - ; - assert(!collector->head); - assert(!collector->tail); -} - -static void pn_collector_shrink(pn_collector_t *collector) -{ - assert(collector); - pn_list_clear(collector->pool); -} - -static void pn_collector_finalize(pn_collector_t *collector) -{ - pn_collector_drain(collector); - pn_decref(collector->pool); -} - -static int pn_collector_inspect(pn_collector_t *collector, pn_string_t *dst) -{ - assert(collector); - int err = pn_string_addf(dst, "EVENTS["); - if (err) return err; - pn_event_t *event = collector->head; - bool first = true; - while (event) { - if (first) { - first = false; - } else { - err = pn_string_addf(dst, ", "); - if (err) return err; - } - err = pn_inspect(event, dst); - if (err) return err; - event = event->next; - } - return pn_string_addf(dst, "]"); -} - -#define pn_collector_hashcode NULL -#define pn_collector_compare NULL - -PN_CLASSDEF(pn_collector) - -pn_collector_t *pn_collector(void) -{ - return pn_collector_new(); -} - -void pn_collector_free(pn_collector_t *collector) -{ - assert(collector); - pn_collector_release(collector); - pn_decref(collector); -} - -void pn_collector_release(pn_collector_t *collector) -{ - assert(collector); - if (!collector->freed) { - collector->freed = true; - pn_collector_drain(collector); - pn_collector_shrink(collector); - } -} - -pn_event_t *pn_event(void); - -pn_event_t *pn_collector_put(pn_collector_t *collector, - const pn_class_t *clazz, void *context, - pn_event_type_t type) -{ - if (!collector) { - return NULL; - } - - assert(context); - - if (collector->freed) { - return NULL; - } - - pn_event_t *tail = collector->tail; - if (tail && tail->type == type && tail->context == context) { - return NULL; - } - - clazz = clazz->reify(context); - - pn_event_t *event = (pn_event_t *) pn_list_pop(collector->pool); - - if (!event) { - event = pn_event(); - } - - event->pool = collector->pool; - pn_incref(event->pool); - - if (tail) { - tail->next = event; - collector->tail = event; - } else { - collector->tail = event; - collector->head = event; - } - - event->clazz = clazz; - event->context = context; - event->type = type; - pn_class_incref(clazz, event->context); - - return event; -} - -pn_event_t *pn_collector_peek(pn_collector_t *collector) -{ - return collector->head; -} - -bool pn_collector_pop(pn_collector_t *collector) -{ - collector->head_returned = false; - pn_event_t *event = collector->head; - if (event) { - collector->head = event->next; - } else { - return false; - } - - if (!collector->head) { - collector->tail = NULL; - } - - pn_decref(event); - return true; -} - -pn_event_t *pn_collector_next(pn_collector_t *collector) -{ - if (collector->head_returned) { - pn_collector_pop(collector); - } - collector->head_returned = collector->head; - return collector->head; -} - -pn_event_t *pn_collector_prev(pn_collector_t *collector) { - return collector->head_returned ? collector->head : NULL; -} - -bool pn_collector_more(pn_collector_t *collector) -{ - assert(collector); - return collector->head && collector->head->next; -} - -static void pn_event_initialize(pn_event_t *event) -{ - event->pool = NULL; - event->type = PN_EVENT_NONE; - event->clazz = NULL; - event->context = NULL; - event->next = NULL; - event->attachments = pn_record(); -} - -static void pn_event_finalize(pn_event_t *event) { - // decref before adding to the free list - if (event->clazz && event->context) { - pn_class_decref(event->clazz, event->context); - } - - pn_list_t *pool = event->pool; - - if (pool && pn_refcount(pool) > 1) { - event->pool = NULL; - event->type = PN_EVENT_NONE; - event->clazz = NULL; - event->context = NULL; - event->next = NULL; - pn_record_clear(event->attachments); - pn_list_add(pool, event); - } else { - pn_decref(event->attachments); - } - - pn_decref(pool); -} - -static int pn_event_inspect(pn_event_t *event, pn_string_t *dst) -{ - assert(event); - assert(dst); - const char *name = pn_event_type_name(event->type); - int err; - if (name) { - err = pn_string_addf(dst, "(%s", pn_event_type_name(event->type)); - } else { - err = pn_string_addf(dst, "(<%u>", (unsigned int) event->type); - } - if (err) return err; - if (event->context) { - err = pn_string_addf(dst, ", "); - if (err) return err; - err = pn_class_inspect(event->clazz, event->context, dst); - if (err) return err; - } - - return pn_string_addf(dst, ")"); -} - -#define pn_event_hashcode NULL -#define pn_event_compare NULL - -PN_CLASSDEF(pn_event) - -pn_event_t *pn_event(void) -{ - return pn_event_new(); -} - -pn_event_type_t pn_event_type(pn_event_t *event) -{ - return event->type; -} - -const pn_class_t *pn_event_class(pn_event_t *event) -{ - assert(event); - return event->clazz; -} - -void *pn_event_context(pn_event_t *event) -{ - assert(event); - return event->context; -} - -pn_record_t *pn_event_attachments(pn_event_t *event) -{ - assert(event); - return event->attachments; -} - -const char *pn_event_type_name(pn_event_type_t type) -{ - switch (type) { - case PN_EVENT_NONE: - return "PN_EVENT_NONE"; - case PN_REACTOR_INIT: - return "PN_REACTOR_INIT"; - case PN_REACTOR_QUIESCED: - return "PN_REACTOR_QUIESCED"; - case PN_REACTOR_FINAL: - return "PN_REACTOR_FINAL"; - case PN_TIMER_TASK: - return "PN_TIMER_TASK"; - case PN_CONNECTION_INIT: - return "PN_CONNECTION_INIT"; - case PN_CONNECTION_BOUND: - return "PN_CONNECTION_BOUND"; - case PN_CONNECTION_UNBOUND: - return "PN_CONNECTION_UNBOUND"; - case PN_CONNECTION_REMOTE_OPEN: - return "PN_CONNECTION_REMOTE_OPEN"; - case PN_CONNECTION_LOCAL_OPEN: - return "PN_CONNECTION_LOCAL_OPEN"; - case PN_CONNECTION_REMOTE_CLOSE: - return "PN_CONNECTION_REMOTE_CLOSE"; - case PN_CONNECTION_LOCAL_CLOSE: - return "PN_CONNECTION_LOCAL_CLOSE"; - case PN_CONNECTION_FINAL: - return "PN_CONNECTION_FINAL"; - case PN_SESSION_INIT: - return "PN_SESSION_INIT"; - case PN_SESSION_REMOTE_OPEN: - return "PN_SESSION_REMOTE_OPEN"; - case PN_SESSION_LOCAL_OPEN: - return "PN_SESSION_LOCAL_OPEN"; - case PN_SESSION_REMOTE_CLOSE: - return "PN_SESSION_REMOTE_CLOSE"; - case PN_SESSION_LOCAL_CLOSE: - return "PN_SESSION_LOCAL_CLOSE"; - case PN_SESSION_FINAL: - return "PN_SESSION_FINAL"; - case PN_LINK_INIT: - return "PN_LINK_INIT"; - case PN_LINK_REMOTE_OPEN: - return "PN_LINK_REMOTE_OPEN"; - case PN_LINK_LOCAL_OPEN: - return "PN_LINK_LOCAL_OPEN"; - case PN_LINK_REMOTE_CLOSE: - return "PN_LINK_REMOTE_CLOSE"; - case PN_LINK_LOCAL_DETACH: - return "PN_LINK_LOCAL_DETACH"; - case PN_LINK_REMOTE_DETACH: - return "PN_LINK_REMOTE_DETACH"; - case PN_LINK_LOCAL_CLOSE: - return "PN_LINK_LOCAL_CLOSE"; - case PN_LINK_FLOW: - return "PN_LINK_FLOW"; - case PN_LINK_FINAL: - return "PN_LINK_FINAL"; - case PN_DELIVERY: - return "PN_DELIVERY"; - case PN_TRANSPORT: - return "PN_TRANSPORT"; - case PN_TRANSPORT_AUTHENTICATED: - return "PN_TRANSPORT_AUTHENTICATED"; - case PN_TRANSPORT_ERROR: - return "PN_TRANSPORT_ERROR"; - case PN_TRANSPORT_HEAD_CLOSED: - return "PN_TRANSPORT_HEAD_CLOSED"; - case PN_TRANSPORT_TAIL_CLOSED: - return "PN_TRANSPORT_TAIL_CLOSED"; - case PN_TRANSPORT_CLOSED: - return "PN_TRANSPORT_CLOSED"; - case PN_SELECTABLE_INIT: - return "PN_SELECTABLE_INIT"; - case PN_SELECTABLE_UPDATED: - return "PN_SELECTABLE_UPDATED"; - case PN_SELECTABLE_READABLE: - return "PN_SELECTABLE_READABLE"; - case PN_SELECTABLE_WRITABLE: - return "PN_SELECTABLE_WRITABLE"; - case PN_SELECTABLE_ERROR: - return "PN_SELECTABLE_ERROR"; - case PN_SELECTABLE_EXPIRED: - return "PN_SELECTABLE_EXPIRED"; - case PN_SELECTABLE_FINAL: - return "PN_SELECTABLE_FINAL"; - case PN_CONNECTION_WAKE: - return "PN_CONNECTION_WAKE"; - case PN_LISTENER_CLOSE: - return "PN_LISTENER_CLOSE"; - case PN_PROACTOR_INTERRUPT: - return "PN_PROACTOR_INTERRUPT"; - case PN_PROACTOR_TIMEOUT: - return "PN_PROACTOR_TIMEOUT"; - case PN_PROACTOR_INACTIVE: - return "PN_PROACTOR_INACTIVE"; - default: - return "PN_UNKNOWN"; - } - return NULL; -} - -pn_event_t *pn_event_batch_next(pn_event_batch_t *batch) { - return batch->next_event(batch); -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/framing.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/framing.c b/proton-c/src/core/framing.c deleted file mode 100644 index 09bf4bb..0000000 --- a/proton-c/src/core/framing.c +++ /dev/null @@ -1,103 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <stdio.h> -#include <string.h> - -#include "framing.h" - -// TODO: These are near duplicates of code in codec.c - they should be -// deduplicated. -static inline void pn_i_write16(char *bytes, uint16_t value) -{ - bytes[0] = 0xFF & (value >> 8); - bytes[1] = 0xFF & (value ); -} - - -static inline void pn_i_write32(char *bytes, uint32_t value) -{ - bytes[0] = 0xFF & (value >> 24); - bytes[1] = 0xFF & (value >> 16); - bytes[2] = 0xFF & (value >> 8); - bytes[3] = 0xFF & (value ); -} - -static inline uint16_t pn_i_read16(const char *bytes) -{ - uint16_t a = (uint8_t) bytes[0]; - uint16_t b = (uint8_t) bytes[1]; - uint16_t r = a << 8 - | b; - return r; -} - -static inline uint32_t pn_i_read32(const char *bytes) -{ - uint32_t a = (uint8_t) bytes[0]; - uint32_t b = (uint8_t) bytes[1]; - uint32_t c = (uint8_t) bytes[2]; - uint32_t d = (uint8_t) bytes[3]; - uint32_t r = a << 24 - | b << 16 - | c << 8 - | d; - return r; -} - - -ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max) -{ - if (available < AMQP_HEADER_SIZE) return 0; - uint32_t size = pn_i_read32(&bytes[0]); - if (max && size > max) return PN_ERR; - if (available < size) return 0; - unsigned int doff = 4 * (uint8_t)bytes[4]; - if (doff < AMQP_HEADER_SIZE || doff > size) return PN_ERR; - - frame->size = size - doff; - frame->ex_size = doff - AMQP_HEADER_SIZE; - frame->type = bytes[5]; - frame->channel = pn_i_read16(&bytes[6]); - frame->extended = bytes + AMQP_HEADER_SIZE; - frame->payload = bytes + doff; - - return size; -} - -size_t pn_write_frame(char *bytes, size_t available, pn_frame_t frame) -{ - size_t size = AMQP_HEADER_SIZE + frame.ex_size + frame.size; - if (size <= available) - { - pn_i_write32(&bytes[0], size); - int doff = (frame.ex_size + AMQP_HEADER_SIZE - 1)/4 + 1; - bytes[4] = doff; - bytes[5] = frame.type; - pn_i_write16(&bytes[6], frame.channel); - - memmove(bytes + AMQP_HEADER_SIZE, frame.extended, frame.ex_size); - memmove(bytes + 4*doff, frame.payload, frame.size); - return size; - } else { - return 0; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/framing.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/framing.h b/proton-c/src/core/framing.h deleted file mode 100644 index ecb88a4..0000000 --- a/proton-c/src/core/framing.h +++ /dev/null @@ -1,44 +0,0 @@ -#ifndef PROTON_FRAMING_H -#define PROTON_FRAMING_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/import_export.h> -#include <proton/type_compat.h> -#include <proton/error.h> - -#define AMQP_HEADER_SIZE (8) -#define AMQP_MIN_MAX_FRAME_SIZE ((uint32_t)512) // minimum allowable max-frame - -typedef struct { - uint8_t type; - uint16_t channel; - size_t ex_size; - const char *extended; - size_t size; - const char *payload; -} pn_frame_t; - -ssize_t pn_read_frame(pn_frame_t *frame, const char *bytes, size_t available, uint32_t max); -size_t pn_write_frame(char *bytes, size_t size, pn_frame_t frame); - -#endif /* framing.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/log.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/log.c b/proton-c/src/core/log.c deleted file mode 100644 index ff96ff0..0000000 --- a/proton-c/src/core/log.c +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include <proton/log.h> -#include <proton/object.h> -#include <stdio.h> -#include "log_private.h" -#include "util.h" - - -static void stderr_logger(const char *message) { - fprintf(stderr, "%s\n", message); -} - -static pn_logger_t logger = stderr_logger; -static int enabled_env = -1; /* Set from environment variable. */ -static int enabled_call = -1; /* set by pn_log_enable */ - -void pn_log_enable(bool value) { - enabled_call = value; -} - -bool pn_log_enabled(void) { - if (enabled_call != -1) return enabled_call; /* Takes precedence */ - if (enabled_env == -1) - enabled_env = pn_env_bool("PN_TRACE_LOG"); - return enabled_env; -} - -void pn_log_logger(pn_logger_t new_logger) { - logger = new_logger; - if (!logger) pn_log_enable(false); -} - -void pn_vlogf_impl(const char *fmt, va_list ap) { - pn_string_t *msg = pn_string(""); - pn_string_vformat(msg, fmt, ap); - fprintf(stderr, "%s\n", pn_string_get(msg)); -} - -/**@internal - * - * Note: We check pn_log_enabled() in the pn_logf macro *before* calling - * pn_logf_impl because evaluating the arguments to that call could have - * side-effects with performance impact (e.g. calling functions to construct - * complicated messages.) It is important that a disabled log statement results - * in nothing more than a call to pn_log_enabled(). - */ -void pn_logf_impl(const char *fmt, ...) { - va_list ap; - va_start(ap, fmt); - pn_vlogf_impl(fmt, ap); - va_end(ap); -} - http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/log_private.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/log_private.h b/proton-c/src/core/log_private.h deleted file mode 100644 index 4725045..0000000 --- a/proton-c/src/core/log_private.h +++ /dev/null @@ -1,54 +0,0 @@ -#ifndef LOG_PRIVATE_H -#define LOG_PRIVATE_H -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -/**@file - * - * Log messages that are not associated with a transport. - */ - -#include <proton/log.h> -#include <stdarg.h> - -/** Log a printf style message */ -#define pn_logf(...) \ - do { \ - if (pn_log_enabled()) \ - pn_logf_impl(__VA_ARGS__); \ - } while(0) - -/** va_list version of pn_logf */ -#define pn_vlogf(fmt, ap) \ - do { \ - if (pn_log_enabled()) \ - pn_vlogf_impl(fmt, ap); \ - } while(0) - -/** Return true if logging is enabled. */ -bool pn_log_enabled(void); - -/**@internal*/ -void pn_logf_impl(const char* fmt, ...); -/**@internal*/ -void pn_vlogf_impl(const char *fmt, va_list ap); - - - -#endif --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
