http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/transport/transport.c ---------------------------------------------------------------------- diff --git a/proton-c/src/transport/transport.c b/proton-c/src/transport/transport.c deleted file mode 100644 index cdecfd2..0000000 --- a/proton-c/src/transport/transport.c +++ /dev/null @@ -1,3018 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "engine/engine-internal.h" -#include "framing/framing.h" -#include "sasl/sasl-internal.h" -#include "ssl/ssl-internal.h" - -#include "autodetect.h" -#include "protocol.h" -#include "dispatch_actions.h" -#include "proton/event.h" -#include "platform.h" -#include "platform_fmt.h" -#include "config.h" -#include "log_private.h" - -#include <stdlib.h> -#include <string.h> -#include <assert.h> -#include <stdarg.h> -#include <stdio.h> - -static ssize_t transport_consume(pn_transport_t *transport); - -// delivery buffers - -/* - * Call this any time anything happens that may affect channel_max: - * i.e. when the app indicates a preference, or when we receive the - * OPEN frame from the remote peer. And call it to do the final - * calculation just before we communicate our limit to the remote - * peer by sending our OPEN frame. - */ -static void pni_calculate_channel_max(pn_transport_t *transport) { - /* - * The application cannot make the limit larger than - * what this library will allow. - */ - transport->channel_max = (PN_IMPL_CHANNEL_MAX < transport->local_channel_max) - ? PN_IMPL_CHANNEL_MAX - : transport->local_channel_max; - - /* - * The remote peer's constraint is not valid until the - * peer's open frame has been received. - */ - if(transport->open_rcvd) { - transport->channel_max = (transport->channel_max < transport->remote_channel_max) - ? transport->channel_max - : transport->remote_channel_max; - } -} - -void pn_delivery_map_init(pn_delivery_map_t *db, pn_sequence_t next) -{ - db->deliveries = pn_hash(PN_WEAKREF, 0, 0.75); - db->next = next; -} - -void pn_delivery_map_free(pn_delivery_map_t *db) -{ - pn_free(db->deliveries); -} - -static pn_delivery_t *pni_delivery_map_get(pn_delivery_map_t *db, pn_sequence_t id) -{ - return (pn_delivery_t *) pn_hash_get(db->deliveries, id); -} - -static void pn_delivery_state_init(pn_delivery_state_t *ds, pn_delivery_t *delivery, pn_sequence_t id) -{ - ds->id = id; - ds->sent = false; - ds->init = true; -} - -static pn_delivery_state_t *pni_delivery_map_push(pn_delivery_map_t *db, pn_delivery_t *delivery) -{ - pn_delivery_state_t *ds = &delivery->state; - pn_delivery_state_init(ds, delivery, db->next++); - pn_hash_put(db->deliveries, ds->id, delivery); - return ds; -} - -void pn_delivery_map_del(pn_delivery_map_t *db, pn_delivery_t *delivery) -{ - if (delivery->state.init) { - delivery->state.init = false; - delivery->state.sent = false; - pn_hash_del(db->deliveries, delivery->state.id); - } -} - -static void pni_delivery_map_clear(pn_delivery_map_t *dm) -{ - pn_hash_t *hash = dm->deliveries; - for (pn_handle_t entry = pn_hash_head(hash); - entry; - entry = pn_hash_next(hash, entry)) - { - pn_delivery_t *dlv = (pn_delivery_t *) pn_hash_value(hash, entry); - pn_delivery_map_del(dm, dlv); - } - dm->next = 0; -} - -static void pni_default_tracer(pn_transport_t *transport, const char *message) -{ - fprintf(stderr, "[%p]:%s\n", (void *) transport, message); -} - -static ssize_t pn_io_layer_input_passthru(pn_transport_t *, unsigned int, const char *, size_t ); -static ssize_t pn_io_layer_output_passthru(pn_transport_t *, unsigned int, char *, size_t ); - -static ssize_t pn_io_layer_input_error(pn_transport_t *, unsigned int, const char *, size_t ); -static ssize_t pn_io_layer_output_error(pn_transport_t *, unsigned int, char *, size_t ); - -static ssize_t pn_io_layer_input_setup(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available); -static ssize_t pn_io_layer_output_setup(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available); - -static ssize_t pn_input_read_amqp_header(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available); -static ssize_t pn_input_read_amqp(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available); -static ssize_t pn_output_write_amqp_header(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available); -static ssize_t pn_output_write_amqp(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available); -static void pn_error_amqp(pn_transport_t *transport, unsigned int layer); -static pn_timestamp_t pn_tick_amqp(pn_transport_t *transport, unsigned int layer, pn_timestamp_t now); - -static ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available); -static ssize_t pn_io_layer_output_null(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available); - -const pn_io_layer_t amqp_header_layer = { - pn_input_read_amqp_header, - pn_output_write_amqp_header, - NULL, - pn_tick_amqp, - NULL -}; - -const pn_io_layer_t amqp_write_header_layer = { - pn_input_read_amqp, - pn_output_write_amqp_header, - NULL, - pn_tick_amqp, - NULL -}; - -const pn_io_layer_t amqp_read_header_layer = { - pn_input_read_amqp_header, - pn_output_write_amqp, - pn_error_amqp, - pn_tick_amqp, - NULL -}; - -const pn_io_layer_t amqp_layer = { - pn_input_read_amqp, - pn_output_write_amqp, - pn_error_amqp, - pn_tick_amqp, - NULL -}; - -const pn_io_layer_t pni_setup_layer = { - pn_io_layer_input_setup, - pn_io_layer_output_setup, - NULL, - NULL, - NULL -}; - -const pn_io_layer_t pni_autodetect_layer = { - pn_io_layer_input_autodetect, - pn_io_layer_output_null, - NULL, - NULL, - NULL -}; - -const pn_io_layer_t pni_passthru_layer = { - pn_io_layer_input_passthru, - pn_io_layer_output_passthru, - NULL, - NULL, - NULL -}; - -const pn_io_layer_t pni_header_error_layer = { - pn_io_layer_input_error, - pn_output_write_amqp_header, - NULL, - NULL, - NULL -}; - -const pn_io_layer_t pni_error_layer = { - pn_io_layer_input_error, - pn_io_layer_output_error, - pn_error_amqp, - NULL, - NULL -}; - -/* Set up the transport protocol layers depending on what is configured */ -static void pn_io_layer_setup(pn_transport_t *transport, unsigned int layer) -{ - assert(layer == 0); - // Figure out if we are server or not - if (transport->server) { - transport->io_layers[layer++] = &pni_autodetect_layer; - return; - } - if (transport->ssl) { - transport->io_layers[layer++] = &ssl_layer; - } - if (transport->sasl) { - transport->io_layers[layer++] = &sasl_header_layer; - } - transport->io_layers[layer++] = &amqp_header_layer; -} - -ssize_t pn_io_layer_input_setup(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available) -{ - pn_io_layer_setup(transport, layer); - return transport->io_layers[layer]->process_input(transport, layer, bytes, available); -} - -ssize_t pn_io_layer_output_setup(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available) -{ - pn_io_layer_setup(transport, layer); - return transport->io_layers[layer]->process_output(transport, layer, bytes, available); -} - -void pn_set_error_layer(pn_transport_t *transport) -{ - // Set every layer to the error layer in case we manually - // pass through (happens from SASL to AMQP) - for (int layer=0; layer<PN_IO_LAYER_CT; ++layer) { - transport->io_layers[layer] = &pni_error_layer; - } -} - -// Autodetect the layer by reading the protocol header -ssize_t pn_io_layer_input_autodetect(pn_transport_t *transport, unsigned int layer, const char *bytes, size_t available) -{ - const char* error; - bool eos = pn_transport_capacity(transport)==PN_EOS; - if (eos && available==0) { - pn_do_error(transport, "amqp:connection:framing-error", "No valid protocol header found"); - pn_set_error_layer(transport); - return PN_EOS; - } - pni_protocol_type_t protocol = pni_sniff_header(bytes, available); - if (transport->trace & PN_TRACE_DRV) - pn_transport_logf(transport, "%s detected", pni_protocol_name(protocol)); - switch (protocol) { - case PNI_PROTOCOL_SSL: - if (!(transport->allowed_layers & LAYER_SSL)) { - error = "SSL protocol header not allowed (maybe detected twice)"; - break; - } - transport->present_layers |= LAYER_SSL; - transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSASL; - if (!transport->ssl) { - pn_ssl(transport); - } - transport->io_layers[layer] = &ssl_layer; - transport->io_layers[layer+1] = &pni_autodetect_layer; - return ssl_layer.process_input(transport, layer, bytes, available); - case PNI_PROTOCOL_AMQP_SSL: - if (!(transport->allowed_layers & LAYER_AMQPSSL)) { - error = "AMQP SSL protocol header not allowed (maybe detected twice)"; - break; - } - transport->present_layers |= LAYER_AMQPSSL; - transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSASL; - if (!transport->ssl) { - pn_ssl(transport); - } - transport->io_layers[layer] = &ssl_layer; - transport->io_layers[layer+1] = &pni_autodetect_layer; - return 8; - case PNI_PROTOCOL_AMQP_SASL: - if (!(transport->allowed_layers & LAYER_AMQPSASL)) { - error = "AMQP SASL protocol header not allowed (maybe detected twice)"; - break; - } - transport->present_layers |= LAYER_AMQPSASL; - transport->allowed_layers &= LAYER_AMQP1 | LAYER_AMQPSSL; - if (!transport->sasl) { - pn_sasl(transport); - } - transport->io_layers[layer] = &sasl_write_header_layer; - transport->io_layers[layer+1] = &pni_autodetect_layer; - if (transport->trace & PN_TRACE_FRM) - pn_transport_logf(transport, " <- %s", "SASL"); - pni_sasl_set_external_security(transport, pn_ssl_get_ssf((pn_ssl_t*)transport), pn_ssl_get_remote_subject((pn_ssl_t*)transport)); - return 8; - case PNI_PROTOCOL_AMQP1: - if (!(transport->allowed_layers & LAYER_AMQP1)) { - error = "AMQP1.0 protocol header not allowed (maybe detected twice)"; - break; - } - transport->present_layers |= LAYER_AMQP1; - transport->allowed_layers = LAYER_NONE; - if (transport->auth_required && !pn_transport_is_authenticated(transport)) { - pn_do_error(transport, "amqp:connection:policy-error", - "Client skipped authentication - forbidden"); - pn_set_error_layer(transport); - return 8; - } - if (transport->encryption_required && !pn_transport_is_encrypted(transport)) { - pn_do_error(transport, "amqp:connection:policy-error", - "Client connection unencryted - forbidden"); - pn_set_error_layer(transport); - return 8; - } - transport->io_layers[layer] = &amqp_write_header_layer; - if (transport->trace & PN_TRACE_FRM) - pn_transport_logf(transport, " <- %s", "AMQP"); - return 8; - case PNI_PROTOCOL_INSUFFICIENT: - if (!eos) return 0; - error = "End of input stream before protocol detection"; - break; - case PNI_PROTOCOL_AMQP_OTHER: - error = "Incompatible AMQP connection detected"; - break; - case PNI_PROTOCOL_UNKNOWN: - default: - error = "Unknown protocol detected"; - break; - } - transport->io_layers[layer] = &pni_header_error_layer; - char quoted[1024]; - pn_quote_data(quoted, 1024, bytes, available); - pn_do_error(transport, "amqp:connection:framing-error", - "%s: '%s'%s", error, quoted, - !eos ? "" : " (connection aborted)"); - return 0; -} - -// We don't know what the output should be - do nothing -ssize_t pn_io_layer_output_null(pn_transport_t *transport, unsigned int layer, char *bytes, size_t available) -{ - return 0; -} - -/** Pass through input handler */ -ssize_t pn_io_layer_input_passthru(pn_transport_t *transport, unsigned int layer, const char *data, size_t available) -{ - if (layer+1<PN_IO_LAYER_CT) - return transport->io_layers[layer+1]->process_input(transport, layer+1, data, available); - return PN_EOS; -} - -/** Pass through output handler */ -ssize_t pn_io_layer_output_passthru(pn_transport_t *transport, unsigned int layer, char *data, size_t available) -{ - if (layer+1<PN_IO_LAYER_CT) - return transport->io_layers[layer+1]->process_output(transport, layer+1, data, available); - return PN_EOS; -} - -/** Input handler after detected error */ -ssize_t pn_io_layer_input_error(pn_transport_t *transport, unsigned int layer, const char *data, size_t available) -{ - return PN_EOS; -} - -/** Output handler after detected error */ -ssize_t pn_io_layer_output_error(pn_transport_t *transport, unsigned int layer, char *data, size_t available) -{ - return PN_EOS; -} - -static void pn_transport_initialize(void *object) -{ - pn_transport_t *transport = (pn_transport_t *)object; - transport->freed = false; - transport->output_buf = NULL; - transport->output_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024; - transport->input_buf = NULL; - transport->input_size = PN_DEFAULT_MAX_FRAME_SIZE ? PN_DEFAULT_MAX_FRAME_SIZE : 16 * 1024; - transport->tracer = pni_default_tracer; - transport->sasl = NULL; - transport->ssl = NULL; - - transport->scratch = pn_string(NULL); - transport->args = pn_data(16); - transport->output_args = pn_data(16); - transport->frame = pn_buffer(PN_TRANSPORT_INITIAL_FRAME_SIZE); - transport->input_frames_ct = 0; - transport->output_frames_ct = 0; - - transport->connection = NULL; - transport->context = pn_record(); - - for (int layer=0; layer<PN_IO_LAYER_CT; ++layer) { - transport->io_layers[layer] = NULL; - } - - transport->allowed_layers = LAYER_AMQP1 | LAYER_AMQPSASL | LAYER_AMQPSSL | LAYER_SSL; - transport->present_layers = LAYER_NONE; - - // Defer setting up the layers until the first data arrives or is sent - transport->io_layers[0] = &pni_setup_layer; - - transport->open_sent = false; - transport->open_rcvd = false; - transport->close_sent = false; - transport->close_rcvd = false; - transport->tail_closed = false; - transport->head_closed = false; - transport->remote_container = NULL; - transport->remote_hostname = NULL; - transport->local_max_frame = PN_DEFAULT_MAX_FRAME_SIZE; - transport->remote_max_frame = (uint32_t) 0xffffffff; - - /* - * We set the local limit on channels to 2^15, because - * parts of the code use the topmost bit (of a short) - * as a flag. - * The peer that this transport connects to may also - * place its own limit on max channel number, and the - * application may also set a limit. - * The maximum that we use will be the minimum of all - * these constraints. - */ - // There is no constraint yet from remote peer, - // so set to max possible. - transport->remote_channel_max = 65535; - transport->local_channel_max = PN_IMPL_CHANNEL_MAX; - transport->channel_max = transport->local_channel_max; - - transport->local_idle_timeout = 0; - transport->dead_remote_deadline = 0; - transport->last_bytes_input = 0; - transport->remote_idle_timeout = 0; - transport->keepalive_deadline = 0; - transport->last_bytes_output = 0; - transport->remote_offered_capabilities = pn_data(0); - transport->remote_desired_capabilities = pn_data(0); - transport->remote_properties = pn_data(0); - transport->disp_data = pn_data(0); - pn_condition_init(&transport->remote_condition); - pn_condition_init(&transport->condition); - transport->error = pn_error(); - - transport->local_channels = pn_hash(PN_WEAKREF, 0, 0.75); - transport->remote_channels = pn_hash(PN_WEAKREF, 0, 0.75); - - transport->bytes_input = 0; - transport->bytes_output = 0; - - transport->input_pending = 0; - transport->output_pending = 0; - - transport->done_processing = false; - - transport->posted_idle_timeout = false; - - transport->server = false; - transport->halt = false; - transport->auth_required = false; - transport->authenticated = false; - transport->encryption_required = false; - - transport->referenced = true; - - transport->trace = - (pn_env_bool("PN_TRACE_RAW") ? PN_TRACE_RAW : PN_TRACE_OFF) | - (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) | - (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF) | - (pn_env_bool("PN_TRACE_EVT") ? PN_TRACE_EVT : PN_TRACE_OFF) ; -} - - -static pn_session_t *pni_channel_state(pn_transport_t *transport, uint16_t channel) -{ - return (pn_session_t *) pn_hash_get(transport->remote_channels, channel); -} - -static void pni_map_remote_channel(pn_session_t *session, uint16_t channel) -{ - pn_transport_t *transport = session->connection->transport; - pn_hash_put(transport->remote_channels, channel, session); - session->state.remote_channel = channel; - pn_ep_incref(&session->endpoint); -} - -void pni_transport_unbind_handles(pn_hash_t *handles, bool reset_state); - -static void pni_unmap_remote_channel(pn_session_t *ssn) -{ - // XXX: should really update link state also - pni_delivery_map_clear(&ssn->state.incoming); - pni_transport_unbind_handles(ssn->state.remote_handles, false); - pn_transport_t *transport = ssn->connection->transport; - uint16_t channel = ssn->state.remote_channel; - ssn->state.remote_channel = -2; - if (pn_hash_get(transport->remote_channels, channel)) { - pn_ep_decref(&ssn->endpoint); - } - // note: may free the session: - pn_hash_del(transport->remote_channels, channel); -} - -static void pn_transport_incref(void *object) -{ - pn_transport_t *transport = (pn_transport_t *) object; - if (!transport->referenced) { - transport->referenced = true; - if (transport->connection) { - pn_incref(transport->connection); - } else { - pn_object_incref(object); - } - } else { - pn_object_incref(object); - } -} - -static void pn_transport_finalize(void *object); -#define pn_transport_new pn_object_new -#define pn_transport_refcount pn_object_refcount -#define pn_transport_decref pn_object_decref -#define pn_transport_reify pn_object_reify -#define pn_transport_hashcode NULL -#define pn_transport_compare NULL -#define pn_transport_inspect NULL - -pn_transport_t *pn_transport(void) -{ -#define pn_transport_free pn_object_free - static const pn_class_t clazz = PN_METACLASS(pn_transport); -#undef pn_transport_free - pn_transport_t *transport = - (pn_transport_t *) pn_class_new(&clazz, sizeof(pn_transport_t)); - if (!transport) return NULL; - - transport->output_buf = (char *) malloc(transport->output_size); - if (!transport->output_buf) { - pn_transport_free(transport); - return NULL; - } - - transport->input_buf = (char *) malloc(transport->input_size); - if (!transport->input_buf) { - pn_transport_free(transport); - return NULL; - } - - transport->capacity = 4*1024; - transport->available = 0; - transport->output = (char *) malloc(transport->capacity); - if (!transport->output) { - pn_transport_free(transport); - return NULL; - } - - return transport; -} - -void pn_transport_set_server(pn_transport_t *transport) -{ - assert(transport); - transport->server = true; -} - -const char *pn_transport_get_user(pn_transport_t *transport) -{ - assert(transport); - // Client - just return whatever we gave to sasl - if (!transport->server) { - if (transport->sasl) return pn_sasl_get_user((pn_sasl_t *)transport); - return "anonymous"; - } - - // Server - // Not finished authentication yet - if (!(transport->present_layers & LAYER_AMQP1)) return 0; - // We have SASL so it takes precedence - if (transport->present_layers & LAYER_AMQPSASL) return pn_sasl_get_user((pn_sasl_t *)transport); - // No SASL but we may have a SSL remote_subject - if (transport->present_layers & (LAYER_AMQPSSL | LAYER_SSL)) return pn_ssl_get_remote_subject((pn_ssl_t *)transport); - // otherwise it's just an unauthenticated anonymous connection - return "anonymous"; -} - -void pn_transport_require_auth(pn_transport_t *transport, bool required) -{ - assert(transport); - transport->auth_required = required; -} - -bool pn_transport_is_authenticated(pn_transport_t *transport) -{ - return transport && transport->authenticated; -} - -void pn_transport_require_encryption(pn_transport_t *transport, bool required) -{ - assert(transport); - transport->encryption_required = required; -} - -bool pn_transport_is_encrypted(pn_transport_t *transport) -{ - return transport && transport->ssl && pn_ssl_get_ssf((pn_ssl_t*)transport)>0; -} - -void pn_transport_free(pn_transport_t *transport) -{ - if (!transport) return; - assert(!transport->freed); - transport->freed = true; - pn_decref(transport); -} - -static void pn_transport_finalize(void *object) -{ - pn_transport_t *transport = (pn_transport_t *) object; - - if (transport->referenced && transport->connection && pn_refcount(transport->connection) > 1) { - pn_object_incref(transport); - transport->referenced = false; - pn_decref(transport->connection); - return; - } - - // once the application frees the transport, no further I/O - // processing can be done to the connection: - pn_transport_unbind(transport); - // we may have posted events, so stay alive until they are processed - if (pn_refcount(transport) > 0) return; - - pn_ssl_free(transport); - pn_sasl_free(transport); - free(transport->remote_container); - free(transport->remote_hostname); - pn_free(transport->remote_offered_capabilities); - pn_free(transport->remote_desired_capabilities); - pn_free(transport->remote_properties); - pn_free(transport->disp_data); - pn_condition_tini(&transport->remote_condition); - pn_condition_tini(&transport->condition); - pn_error_free(transport->error); - pn_free(transport->local_channels); - pn_free(transport->remote_channels); - if (transport->input_buf) free(transport->input_buf); - if (transport->output_buf) free(transport->output_buf); - pn_free(transport->scratch); - pn_data_free(transport->args); - pn_data_free(transport->output_args); - pn_buffer_free(transport->frame); - pn_free(transport->context); - free(transport->output); -} - -static void pni_post_remote_open_events(pn_transport_t *transport, pn_connection_t *connection) { - pn_collector_put(connection->collector, PN_OBJECT, connection, PN_CONNECTION_REMOTE_OPEN); - if (transport->remote_idle_timeout) { - pn_collector_put(connection->collector, PN_OBJECT, transport, PN_TRANSPORT); - } -} - -int pn_transport_bind(pn_transport_t *transport, pn_connection_t *connection) -{ - assert(transport); - assert(connection); - - if (transport->connection) return PN_STATE_ERR; - if (connection->transport) return PN_STATE_ERR; - - transport->connection = connection; - connection->transport = transport; - - pn_incref(connection); - - pn_connection_bound(connection); - - // set the hostname/user/password - if (pn_string_size(connection->auth_user)) { - pn_sasl(transport); - pni_sasl_set_user_password(transport, pn_string_get(connection->auth_user), pn_string_get(connection->auth_password)); - } - - if (pn_string_size(connection->hostname)) { - if (transport->sasl) { - pni_sasl_set_remote_hostname(transport, pn_string_get(connection->hostname)); - } - - // be sure not to overwrite a hostname already set by the user via - // pn_ssl_set_peer_hostname() called before the bind - if (transport->ssl) { - size_t name_len = 0; - pn_ssl_get_peer_hostname((pn_ssl_t*) transport, NULL, &name_len); - if (name_len == 0) { - pn_ssl_set_peer_hostname((pn_ssl_t*) transport, pn_string_get(connection->hostname)); - } - } - } - - if (transport->open_rcvd) { - PN_SET_REMOTE(connection->endpoint.state, PN_REMOTE_ACTIVE); - pni_post_remote_open_events(transport, connection); - transport->halt = false; - transport_consume(transport); // blech - testBindAfterOpen - } - - return 0; -} - -void pni_transport_unbind_handles(pn_hash_t *handles, bool reset_state) -{ - for (pn_handle_t h = pn_hash_head(handles); h; h = pn_hash_next(handles, h)) { - uintptr_t key = pn_hash_key(handles, h); - pn_link_t *link = (pn_link_t *) pn_hash_value(handles, h); - if (reset_state) { - pn_link_unbound(link); - } - pn_ep_decref(&link->endpoint); - pn_hash_del(handles, key); - } -} - -void pni_transport_unbind_channels(pn_hash_t *channels) -{ - for (pn_handle_t h = pn_hash_head(channels); h; h = pn_hash_next(channels, h)) { - uintptr_t key = pn_hash_key(channels, h); - pn_session_t *ssn = (pn_session_t *) pn_hash_value(channels, h); - pni_delivery_map_clear(&ssn->state.incoming); - pni_delivery_map_clear(&ssn->state.outgoing); - pni_transport_unbind_handles(ssn->state.local_handles, true); - pni_transport_unbind_handles(ssn->state.remote_handles, true); - pn_session_unbound(ssn); - pn_ep_decref(&ssn->endpoint); - pn_hash_del(channels, key); - } -} - -int pn_transport_unbind(pn_transport_t *transport) -{ - assert(transport); - if (!transport->connection) return 0; - - - pn_connection_t *conn = transport->connection; - transport->connection = NULL; - bool was_referenced = transport->referenced; - - pn_collector_put(conn->collector, PN_OBJECT, conn, PN_CONNECTION_UNBOUND); - - // XXX: what happens if the endpoints are freed before we get here? - pn_session_t *ssn = pn_session_head(conn, 0); - while (ssn) { - pni_delivery_map_clear(&ssn->state.incoming); - pni_delivery_map_clear(&ssn->state.outgoing); - ssn = pn_session_next(ssn, 0); - } - - pn_endpoint_t *endpoint = conn->endpoint_head; - while (endpoint) { - pn_condition_clear(&endpoint->remote_condition); - pn_modified(conn, endpoint, true); - endpoint = endpoint->endpoint_next; - } - - pni_transport_unbind_channels(transport->local_channels); - pni_transport_unbind_channels(transport->remote_channels); - - pn_connection_unbound(conn); - if (was_referenced) { - pn_decref(conn); - } - return 0; -} - -pn_error_t *pn_transport_error(pn_transport_t *transport) -{ - assert(transport); - if (pn_condition_is_set(&transport->condition)) { - pn_error_format(transport->error, PN_ERR, "%s: %s", - pn_condition_get_name(&transport->condition), - pn_condition_get_description(&transport->condition)); - } else { - pn_error_clear(transport->error); - } - return transport->error; -} - -pn_condition_t *pn_transport_condition(pn_transport_t *transport) -{ - assert(transport); - return &transport->condition; -} - -static void pni_map_remote_handle(pn_link_t *link, uint32_t handle) -{ - link->state.remote_handle = handle; - pn_hash_put(link->session->state.remote_handles, handle, link); - pn_ep_incref(&link->endpoint); -} - -static void pni_unmap_remote_handle(pn_link_t *link) -{ - uintptr_t handle = link->state.remote_handle; - link->state.remote_handle = -2; - if (pn_hash_get(link->session->state.remote_handles, handle)) { - pn_ep_decref(&link->endpoint); - } - // may delete link: - pn_hash_del(link->session->state.remote_handles, handle); -} - -static pn_link_t *pni_handle_state(pn_session_t *ssn, uint32_t handle) -{ - return (pn_link_t *) pn_hash_get(ssn->state.remote_handles, handle); -} - -bool pni_disposition_batchable(pn_disposition_t *disposition) -{ - switch (disposition->type) { - case PN_ACCEPTED: - return true; - case PN_RELEASED: - return true; - default: - return false; - } -} - -static int pni_disposition_encode(pn_disposition_t *disposition, pn_data_t *data) -{ - pn_condition_t *cond = &disposition->condition; - switch (disposition->type) { - case PN_RECEIVED: - PN_RETURN_IF_ERROR(pn_data_put_list(data)); - pn_data_enter(data); - PN_RETURN_IF_ERROR(pn_data_put_uint(data, disposition->section_number)); - PN_RETURN_IF_ERROR(pn_data_put_ulong(data, disposition->section_offset)); - pn_data_exit(data); - return 0; - case PN_ACCEPTED: - case PN_RELEASED: - return 0; - case PN_REJECTED: - return pn_data_fill(data, "[?DL[sSC]]", pn_condition_is_set(cond), ERROR, - pn_condition_get_name(cond), - pn_condition_get_description(cond), - pn_condition_info(cond)); - case PN_MODIFIED: - return pn_data_fill(data, "[ooC]", - disposition->failed, - disposition->undeliverable, - disposition->annotations); - default: - return pn_data_copy(data, disposition->data); - } -} - - -void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir, - pn_data_t *args, const char *payload, size_t size) -{ - if (transport->trace & PN_TRACE_FRM) { - pn_string_format(transport->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-"); - pn_inspect(args, transport->scratch); - - if (pn_data_size(args)==0) { - pn_string_addf(transport->scratch, "(EMPTY FRAME)"); - } - - if (size) { - char buf[1024]; - int e = pn_quote_data(buf, 1024, payload, size); - pn_string_addf(transport->scratch, " (%" PN_ZU ") \"%s\"%s", size, buf, - e == PN_OVERFLOW ? "... (truncated)" : ""); - } - - pn_transport_log(transport, pn_string_get(transport->scratch)); - } -} - -int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...) -{ - pn_buffer_t *frame_buf = transport->frame; - va_list ap; - va_start(ap, fmt); - pn_data_clear(transport->output_args); - int err = pn_data_vfill(transport->output_args, fmt, ap); - va_end(ap); - if (err) { - pn_transport_logf(transport, - "error posting frame: %s, %s: %s", fmt, pn_code(err), - pn_error_text(pn_data_error(transport->output_args))); - return PN_ERR; - } - - pn_do_trace(transport, ch, OUT, transport->output_args, NULL, 0); - - encode_performatives: - pn_buffer_clear( frame_buf ); - pn_rwbytes_t buf = pn_buffer_memory( frame_buf ); - buf.size = pn_buffer_available( frame_buf ); - - ssize_t wr = pn_data_encode( transport->output_args, buf.start, buf.size ); - if (wr < 0) { - if (wr == PN_OVERFLOW) { - pn_buffer_ensure( frame_buf, pn_buffer_available( frame_buf ) * 2 ); - goto encode_performatives; - } - pn_transport_logf(transport, - "error posting frame: %s", pn_code(wr)); - return PN_ERR; - } - - pn_frame_t frame = {0,}; - frame.type = type; - frame.channel = ch; - frame.payload = buf.start; - frame.size = wr; - size_t n; - while (!(n = pn_write_frame(transport->output + transport->available, - transport->capacity - transport->available, frame))) { - transport->capacity *= 2; - transport->output = (char *) realloc(transport->output, transport->capacity); - } - transport->output_frames_ct += 1; - if (transport->trace & PN_TRACE_RAW) { - pn_string_set(transport->scratch, "RAW: \""); - pn_quote(transport->scratch, transport->output + transport->available, n); - pn_string_addf(transport->scratch, "\""); - pn_transport_log(transport, pn_string_get(transport->scratch)); - } - transport->available += n; - - return 0; -} - -static int pni_post_amqp_transfer_frame(pn_transport_t *transport, uint16_t ch, - uint32_t handle, - pn_sequence_t id, - pn_bytes_t *payload, - const pn_bytes_t *tag, - uint32_t message_format, - bool settled, - bool more, - pn_sequence_t frame_limit, - uint64_t code, - pn_data_t* state) -{ - bool more_flag = more; - int framecount = 0; - pn_buffer_t *frame = transport->frame; - - // create preformatives, assuming 'more' flag need not change - - compute_performatives: - pn_data_clear(transport->output_args); - int err = pn_data_fill(transport->output_args, "DL[IIzIoon?DLC]", TRANSFER, - handle, id, tag->size, tag->start, - message_format, - settled, more_flag, (bool)code, code, state); - if (err) { - pn_transport_logf(transport, - "error posting transfer frame: %s: %s", pn_code(err), - pn_error_text(pn_data_error(transport->output_args))); - return PN_ERR; - } - - do { // send as many frames as possible without changing the 'more' flag... - - encode_performatives: - pn_buffer_clear( frame ); - pn_rwbytes_t buf = pn_buffer_memory( frame ); - buf.size = pn_buffer_available( frame ); - - ssize_t wr = pn_data_encode(transport->output_args, buf.start, buf.size); - if (wr < 0) { - if (wr == PN_OVERFLOW) { - pn_buffer_ensure( frame, pn_buffer_available( frame ) * 2 ); - goto encode_performatives; - } - pn_transport_logf(transport, "error posting frame: %s", pn_code(wr)); - return PN_ERR; - } - buf.size = wr; - - // check if we need to break up the outbound frame - size_t available = payload->size; - if (transport->remote_max_frame) { - if ((available + buf.size) > transport->remote_max_frame - 8) { - available = transport->remote_max_frame - 8 - buf.size; - if (more_flag == false) { - more_flag = true; - goto compute_performatives; // deal with flag change - } - } else if (more_flag == true && more == false) { - // caller has no more, and this is the last frame - more_flag = false; - goto compute_performatives; - } - } - - if (pn_buffer_available( frame ) < (available + buf.size)) { - // not enough room for payload - try again... - pn_buffer_ensure( frame, available + buf.size ); - goto encode_performatives; - } - - pn_do_trace(transport, ch, OUT, transport->output_args, payload->start, available); - - memmove( buf.start + buf.size, payload->start, available); - payload->start += available; - payload->size -= available; - buf.size += available; - - pn_frame_t frame = {AMQP_FRAME_TYPE}; - frame.channel = ch; - frame.payload = buf.start; - frame.size = buf.size; - - size_t n; - while (!(n = pn_write_frame(transport->output + transport->available, - transport->capacity - transport->available, frame))) { - transport->capacity *= 2; - transport->output = (char *) realloc(transport->output, transport->capacity); - } - transport->output_frames_ct += 1; - framecount++; - if (transport->trace & PN_TRACE_RAW) { - pn_string_set(transport->scratch, "RAW: \""); - pn_quote(transport->scratch, transport->output + transport->available, n); - pn_string_addf(transport->scratch, "\""); - pn_transport_log(transport, pn_string_get(transport->scratch)); - } - transport->available += n; - } while (payload->size > 0 && framecount < frame_limit); - - return framecount; -} - -static int pni_post_close(pn_transport_t *transport, pn_condition_t *cond) -{ - if (!cond && transport->connection) { - cond = pn_connection_condition(transport->connection); - } - const char *condition = NULL; - const char *description = NULL; - pn_data_t *info = NULL; - if (pn_condition_is_set(cond)) { - condition = pn_condition_get_name(cond); - description = pn_condition_get_description(cond); - info = pn_condition_info(cond); - } - - return pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[?DL[sSC]]", CLOSE, - (bool) condition, ERROR, condition, description, info); -} - -static pn_collector_t *pni_transport_collector(pn_transport_t *transport) -{ - if (transport->connection && transport->connection->collector) { - return transport->connection->collector; - } else { - return NULL; - } -} - -static void pni_maybe_post_closed(pn_transport_t *transport) -{ - pn_collector_t *collector = pni_transport_collector(transport); - if (transport->head_closed && transport->tail_closed) { - pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_CLOSED); - } -} - -static void pni_close_tail(pn_transport_t *transport) -{ - if (!transport->tail_closed) { - transport->tail_closed = true; - pn_collector_t *collector = pni_transport_collector(transport); - pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_TAIL_CLOSED); - pni_maybe_post_closed(transport); - } -} - -int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...) -{ - va_list ap; - va_start(ap, fmt); - char buf[1024]; - if (fmt) { - // XXX: result - vsnprintf(buf, 1024, fmt, ap); - } else { - buf[0] = '\0'; - } - va_end(ap); - pn_condition_t *cond = &transport->condition; - if (!pn_condition_is_set(cond)) { - pn_condition_set_name(cond, condition); - if (fmt) { - pn_condition_set_description(cond, buf); - } - } else { - const char *first = pn_condition_get_description(cond); - if (first && fmt) { - char extended[2048]; - snprintf(extended, 2048, "%s (%s)", first, buf); - pn_condition_set_description(cond, extended); - } else if (fmt) { - pn_condition_set_description(cond, buf); - } - } - pn_collector_t *collector = pni_transport_collector(transport); - pn_collector_put(collector, PN_OBJECT, transport, PN_TRANSPORT_ERROR); - if (transport->trace & PN_TRACE_DRV) { - pn_transport_logf(transport, "ERROR %s %s", condition, buf); - } - - for (int i = 0; i<PN_IO_LAYER_CT; ++i) { - if (transport->io_layers[i] && transport->io_layers[i]->handle_error) - transport->io_layers[i]->handle_error(transport, i); - } - - pni_close_tail(transport); - return PN_ERR; -} - -static char *pn_bytes_strdup(pn_bytes_t str) -{ - return pn_strndup(str.start, str.size); -} - -int pn_do_open(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - pn_connection_t *conn = transport->connection; - bool container_q, hostname_q, remote_channel_max_q, remote_max_frame_q; - uint16_t remote_channel_max; - uint32_t remote_max_frame; - pn_bytes_t remote_container, remote_hostname; - pn_data_clear(transport->remote_offered_capabilities); - pn_data_clear(transport->remote_desired_capabilities); - pn_data_clear(transport->remote_properties); - int err = pn_data_scan(args, "D.[?S?S?I?HI..CCC]", - &container_q, &remote_container, - &hostname_q, &remote_hostname, - &remote_max_frame_q, &remote_max_frame, - &remote_channel_max_q, &remote_channel_max, - &transport->remote_idle_timeout, - transport->remote_offered_capabilities, - transport->remote_desired_capabilities, - transport->remote_properties); - if (err) return err; - /* - * The default value is already stored in the variable. - * But the scanner zeroes out values if it does not - * find them in the args, so don't give the variable - * directly to the scanner. - */ - if (remote_channel_max_q) { - transport->remote_channel_max = remote_channel_max; - } - - if (remote_max_frame_q) { - transport->remote_max_frame = remote_max_frame; - } - - if (transport->remote_max_frame > 0) { - if (transport->remote_max_frame < AMQP_MIN_MAX_FRAME_SIZE) { - pn_transport_logf(transport, "Peer advertised bad max-frame (%u), forcing to %u", - transport->remote_max_frame, AMQP_MIN_MAX_FRAME_SIZE); - transport->remote_max_frame = AMQP_MIN_MAX_FRAME_SIZE; - } - } - if (container_q) { - transport->remote_container = pn_bytes_strdup(remote_container); - } else { - transport->remote_container = NULL; - } - if (hostname_q) { - transport->remote_hostname = pn_bytes_strdup(remote_hostname); - } else { - transport->remote_hostname = NULL; - } - - if (conn) { - PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_ACTIVE); - pni_post_remote_open_events(transport, conn); - } else { - transport->halt = true; - } - transport->open_rcvd = true; - pni_calculate_channel_max(transport); - return 0; -} - -int pn_do_begin(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - bool reply; - uint16_t remote_channel; - pn_sequence_t next; - int err = pn_data_scan(args, "D.[?HI]", &reply, &remote_channel, &next); - if (err) return err; - - // AMQP 1.0 section 2.7.1 - if the peer doesn't honor our channel_max -- - // express our displeasure by closing the connection with a framing error. - if (remote_channel > transport->channel_max) { - pn_do_error(transport, - "amqp:connection:framing-error", - "remote channel %d is above negotiated channel_max %d.", - remote_channel, - transport->channel_max - ); - return PN_TRANSPORT_ERROR; - } - - pn_session_t *ssn; - if (reply) { - ssn = (pn_session_t *) pn_hash_get(transport->local_channels, remote_channel); - } else { - ssn = pn_session(transport->connection); - } - if (ssn == 0) { - pn_do_error(transport, - "amqp:connection:framing-error", - "remote channel is above negotiated channel_max %d.", - transport->channel_max - ); - return PN_TRANSPORT_ERROR; - } - ssn->state.incoming_transfer_count = next; - pni_map_remote_channel(ssn, channel); - PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_ACTIVE); - pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_OPEN); - return 0; -} - -pn_link_t *pn_find_link(pn_session_t *ssn, pn_bytes_t name, bool is_sender) -{ - pn_endpoint_type_t type = is_sender ? SENDER : RECEIVER; - - for (size_t i = 0; i < pn_list_size(ssn->links); i++) - { - pn_link_t *link = (pn_link_t *) pn_list_get(ssn->links, i); - if (link->endpoint.type == type && - // This function is used to locate the link object for an - // incoming attach. If a link object of the same name is found - // which is closed both locally and remotely, assume that is - // no longer in use. - !((link->endpoint.state & PN_LOCAL_CLOSED) && (link->endpoint.state & PN_REMOTE_CLOSED)) && - !strncmp(name.start, pn_string_get(link->name), name.size)) - { - return link; - } - } - return NULL; -} - -static pn_expiry_policy_t symbol2policy(pn_bytes_t symbol) -{ - if (!symbol.start) - return PN_EXPIRE_WITH_SESSION; - - if (!strncmp(symbol.start, "link-detach", symbol.size)) - return PN_EXPIRE_WITH_LINK; - if (!strncmp(symbol.start, "session-end", symbol.size)) - return PN_EXPIRE_WITH_SESSION; - if (!strncmp(symbol.start, "connection-close", symbol.size)) - return PN_EXPIRE_WITH_CONNECTION; - if (!strncmp(symbol.start, "never", symbol.size)) - return PN_EXPIRE_NEVER; - - return PN_EXPIRE_WITH_SESSION; -} - -static pn_distribution_mode_t symbol2dist_mode(const pn_bytes_t symbol) -{ - if (!symbol.start) - return PN_DIST_MODE_UNSPECIFIED; - - if (!strncmp(symbol.start, "move", symbol.size)) - return PN_DIST_MODE_MOVE; - if (!strncmp(symbol.start, "copy", symbol.size)) - return PN_DIST_MODE_COPY; - - return PN_DIST_MODE_UNSPECIFIED; -} - -static const char *dist_mode2symbol(const pn_distribution_mode_t mode) -{ - switch (mode) - { - case PN_DIST_MODE_COPY: - return "copy"; - case PN_DIST_MODE_MOVE: - return "move"; - default: - return NULL; - } -} - -int pn_terminus_set_address_bytes(pn_terminus_t *terminus, pn_bytes_t address) -{ - assert(terminus); - return pn_string_setn(terminus->address, address.start, address.size); -} - -int pn_do_attach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - pn_bytes_t name; - uint32_t handle; - bool is_sender; - pn_bytes_t source, target; - pn_durability_t src_dr, tgt_dr; - pn_bytes_t src_exp, tgt_exp; - pn_seconds_t src_timeout, tgt_timeout; - bool src_dynamic, tgt_dynamic; - pn_sequence_t idc; - pn_bytes_t dist_mode; - bool snd_settle, rcv_settle; - uint8_t snd_settle_mode, rcv_settle_mode; - int err = pn_data_scan(args, "D.[SIo?B?BD.[SIsIo.s]D.[SIsIo]..I]", &name, &handle, - &is_sender, - &snd_settle, &snd_settle_mode, - &rcv_settle, &rcv_settle_mode, - &source, &src_dr, &src_exp, &src_timeout, &src_dynamic, &dist_mode, - &target, &tgt_dr, &tgt_exp, &tgt_timeout, &tgt_dynamic, - &idc); - if (err) return err; - char strbuf[128]; // avoid malloc for most link names - char *strheap = (name.size >= sizeof(strbuf)) ? (char *) malloc(name.size + 1) : NULL; - char *strname = strheap ? strheap : strbuf; - strncpy(strname, name.start, name.size); - strname[name.size] = '\0'; - - pn_session_t *ssn = pni_channel_state(transport, channel); - if (!ssn) { - pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - if (strheap) free(strheap); - return PN_EOS; - } - pn_link_t *link = pn_find_link(ssn, name, is_sender); - if (!link) { - if (is_sender) { - link = (pn_link_t *) pn_sender(ssn, strname); - } else { - link = (pn_link_t *) pn_receiver(ssn, strname); - } - } - - if (strheap) { - free(strheap); - } - - pni_map_remote_handle(link, handle); - PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_ACTIVE); - pn_terminus_t *rsrc = &link->remote_source; - if (source.start || src_dynamic) { - pn_terminus_set_type(rsrc, PN_SOURCE); - pn_terminus_set_address_bytes(rsrc, source); - pn_terminus_set_durability(rsrc, src_dr); - pn_terminus_set_expiry_policy(rsrc, symbol2policy(src_exp)); - pn_terminus_set_timeout(rsrc, src_timeout); - pn_terminus_set_dynamic(rsrc, src_dynamic); - pn_terminus_set_distribution_mode(rsrc, symbol2dist_mode(dist_mode)); - } else { - pn_terminus_set_type(rsrc, PN_UNSPECIFIED); - } - pn_terminus_t *rtgt = &link->remote_target; - if (target.start || tgt_dynamic) { - pn_terminus_set_type(rtgt, PN_TARGET); - pn_terminus_set_address_bytes(rtgt, target); - pn_terminus_set_durability(rtgt, tgt_dr); - pn_terminus_set_expiry_policy(rtgt, symbol2policy(tgt_exp)); - pn_terminus_set_timeout(rtgt, tgt_timeout); - pn_terminus_set_dynamic(rtgt, tgt_dynamic); - } else { - uint64_t code = 0; - pn_data_clear(link->remote_target.capabilities); - err = pn_data_scan(args, "D.[.....D..DL[C]...]", &code, - link->remote_target.capabilities); - if (err) return err; - if (code == COORDINATOR) { - pn_terminus_set_type(rtgt, PN_COORDINATOR); - } else if (code == TARGET) { - pn_terminus_set_type(rtgt, PN_TARGET); - } else { - pn_terminus_set_type(rtgt, PN_UNSPECIFIED); - } - } - - if (snd_settle) - link->remote_snd_settle_mode = snd_settle_mode; - if (rcv_settle) - link->remote_rcv_settle_mode = rcv_settle_mode; - - pn_data_clear(link->remote_source.properties); - pn_data_clear(link->remote_source.filter); - pn_data_clear(link->remote_source.outcomes); - pn_data_clear(link->remote_source.capabilities); - pn_data_clear(link->remote_target.properties); - pn_data_clear(link->remote_target.capabilities); - - err = pn_data_scan(args, "D.[.....D.[.....C.C.CC]D.[.....CC]", - link->remote_source.properties, - link->remote_source.filter, - link->remote_source.outcomes, - link->remote_source.capabilities, - link->remote_target.properties, - link->remote_target.capabilities); - if (err) return err; - - pn_data_rewind(link->remote_source.properties); - pn_data_rewind(link->remote_source.filter); - pn_data_rewind(link->remote_source.outcomes); - pn_data_rewind(link->remote_source.capabilities); - pn_data_rewind(link->remote_target.properties); - pn_data_rewind(link->remote_target.capabilities); - - if (!is_sender) { - link->state.delivery_count = idc; - } - - pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_OPEN); - return 0; -} - -static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link); - -// free the delivery -static void pn_full_settle(pn_delivery_map_t *db, pn_delivery_t *delivery) -{ - assert(!delivery->work); - pn_clear_tpwork(delivery); - pn_delivery_map_del(db, delivery); - pn_incref(delivery); - pn_decref(delivery); -} - -int pn_do_transfer(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - // XXX: multi transfer - uint32_t handle; - pn_bytes_t tag; - bool id_present; - pn_sequence_t id; - bool settled; - bool more; - bool has_type; - uint64_t type; - pn_data_clear(transport->disp_data); - int err = pn_data_scan(args, "D.[I?Iz.oo.D?LC]", &handle, &id_present, &id, &tag, - &settled, &more, &has_type, &type, transport->disp_data); - if (err) return err; - pn_session_t *ssn = pni_channel_state(transport, channel); - if (!ssn) { - return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - } - - if (!ssn->state.incoming_window) { - return pn_do_error(transport, "amqp:session:window-violation", "incoming session window exceeded"); - } - - pn_link_t *link = pni_handle_state(ssn, handle); - if (!link) { - return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); - } - pn_delivery_t *delivery; - if (link->unsettled_tail && !link->unsettled_tail->done) { - delivery = link->unsettled_tail; - } else { - pn_delivery_map_t *incoming = &ssn->state.incoming; - - if (!ssn->state.incoming_init) { - incoming->next = id; - ssn->state.incoming_init = true; - ssn->incoming_deliveries++; - } - - delivery = pn_delivery(link, pn_dtag(tag.start, tag.size)); - pn_delivery_state_t *state = pni_delivery_map_push(incoming, delivery); - if (id_present && id != state->id) { - return pn_do_error(transport, "amqp:session:invalid-field", - "sequencing error, expected delivery-id %u, got %u", - state->id, id); - } - if (has_type) { - delivery->remote.type = type; - pn_data_copy(delivery->remote.data, transport->disp_data); - } - - link->state.delivery_count++; - link->state.link_credit--; - link->queued++; - - // XXX: need to fill in remote state: delivery->remote.state = ...; - delivery->remote.settled = settled; - if (settled) { - delivery->updated = true; - pn_work_update(transport->connection, delivery); - } - } - - pn_buffer_append(delivery->bytes, payload->start, payload->size); - ssn->incoming_bytes += payload->size; - delivery->done = !more; - - ssn->state.incoming_transfer_count++; - ssn->state.incoming_window--; - - // XXX: need better policy for when to refresh window - if (!ssn->state.incoming_window && (int32_t) link->state.local_handle >= 0) { - pni_post_flow(transport, ssn, link); - } - - pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); - return 0; -} - -int pn_do_flow(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - pn_sequence_t onext, inext, delivery_count; - uint32_t iwin, owin, link_credit; - uint32_t handle; - bool inext_init, handle_init, dcount_init, drain; - int err = pn_data_scan(args, "D.[?IIII?I?II.o]", &inext_init, &inext, &iwin, - &onext, &owin, &handle_init, &handle, &dcount_init, - &delivery_count, &link_credit, &drain); - if (err) return err; - - pn_session_t *ssn = pni_channel_state(transport, channel); - if (!ssn) { - return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - } - - if (inext_init) { - ssn->state.remote_incoming_window = inext + iwin - ssn->state.outgoing_transfer_count; - } else { - ssn->state.remote_incoming_window = iwin; - } - - if (handle_init) { - pn_link_t *link = pni_handle_state(ssn, handle); - if (!link) { - return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); - } - if (link->endpoint.type == SENDER) { - pn_sequence_t receiver_count; - if (dcount_init) { - receiver_count = delivery_count; - } else { - // our initial delivery count - receiver_count = 0; - } - pn_sequence_t old = link->state.link_credit; - link->state.link_credit = receiver_count + link_credit - link->state.delivery_count; - link->credit += link->state.link_credit - old; - link->drain = drain; - pn_delivery_t *delivery = pn_link_current(link); - if (delivery) pn_work_update(transport->connection, delivery); - } else { - pn_sequence_t delta = delivery_count - link->state.delivery_count; - if (delta > 0) { - link->state.delivery_count += delta; - link->state.link_credit -= delta; - link->credit -= delta; - link->drained += delta; - } - } - - pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_FLOW); - } - - return 0; -} - -#define SCAN_ERROR_DEFAULT ("D.[D.[sSC]") -#define SCAN_ERROR_DETACH ("D.[..D.[sSC]") -#define SCAN_ERROR_DISP ("[D.[sSC]") - -static int pn_scan_error(pn_data_t *data, pn_condition_t *condition, const char *fmt) -{ - pn_bytes_t cond; - pn_bytes_t desc; - pn_condition_clear(condition); - int err = pn_data_scan(data, fmt, &cond, &desc, condition->info); - if (err) return err; - pn_string_setn(condition->name, cond.start, cond.size); - pn_string_setn(condition->description, desc.start, desc.size); - pn_data_rewind(condition->info); - return 0; -} - -int pn_do_disposition(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - bool role; - pn_sequence_t first, last; - uint64_t type = 0; - bool last_init, settled, type_init; - pn_data_clear(transport->disp_data); - int err = pn_data_scan(args, "D.[oI?IoD?LC]", &role, &first, &last_init, - &last, &settled, &type_init, &type, - transport->disp_data); - if (err) return err; - if (!last_init) last = first; - - pn_session_t *ssn = pni_channel_state(transport, channel); - if (!ssn) { - return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - } - - pn_delivery_map_t *deliveries; - if (role) { - deliveries = &ssn->state.outgoing; - } else { - deliveries = &ssn->state.incoming; - } - - pn_data_rewind(transport->disp_data); - bool remote_data = (pn_data_next(transport->disp_data) && - pn_data_get_list(transport->disp_data) > 0); - - for (pn_sequence_t id = first; id <= last; id++) { - pn_delivery_t *delivery = pni_delivery_map_get(deliveries, id); - pn_disposition_t *remote = &delivery->remote; - if (delivery) { - if (type_init) remote->type = type; - if (remote_data) { - switch (type) { - case PN_RECEIVED: - pn_data_rewind(transport->disp_data); - pn_data_next(transport->disp_data); - pn_data_enter(transport->disp_data); - if (pn_data_next(transport->disp_data)) - remote->section_number = pn_data_get_uint(transport->disp_data); - if (pn_data_next(transport->disp_data)) - remote->section_offset = pn_data_get_ulong(transport->disp_data); - break; - case PN_ACCEPTED: - break; - case PN_REJECTED: - err = pn_scan_error(transport->disp_data, &remote->condition, SCAN_ERROR_DISP); - if (err) return err; - break; - case PN_RELEASED: - break; - case PN_MODIFIED: - pn_data_rewind(transport->disp_data); - pn_data_next(transport->disp_data); - pn_data_enter(transport->disp_data); - if (pn_data_next(transport->disp_data)) - remote->failed = pn_data_get_bool(transport->disp_data); - if (pn_data_next(transport->disp_data)) - remote->undeliverable = pn_data_get_bool(transport->disp_data); - pn_data_narrow(transport->disp_data); - pn_data_clear(remote->data); - pn_data_appendn(remote->annotations, transport->disp_data, 1); - pn_data_widen(transport->disp_data); - break; - default: - pn_data_copy(remote->data, transport->disp_data); - break; - } - } - remote->settled = settled; - delivery->updated = true; - pn_work_update(transport->connection, delivery); - - pn_collector_put(transport->connection->collector, PN_OBJECT, delivery, PN_DELIVERY); - } - } - - return 0; -} - -int pn_do_detach(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - uint32_t handle; - bool closed; - int err = pn_data_scan(args, "D.[Io]", &handle, &closed); - if (err) return err; - - pn_session_t *ssn = pni_channel_state(transport, channel); - if (!ssn) { - return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - } - pn_link_t *link = pni_handle_state(ssn, handle); - if (!link) { - return pn_do_error(transport, "amqp:invalid-field", "no such handle: %u", handle); - } - - err = pn_scan_error(args, &link->endpoint.remote_condition, SCAN_ERROR_DETACH); - if (err) return err; - - if (closed) - { - PN_SET_REMOTE(link->endpoint.state, PN_REMOTE_CLOSED); - pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_CLOSE); - } else { - pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_REMOTE_DETACH); - } - - pni_unmap_remote_handle(link); - return 0; -} - -int pn_do_end(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - pn_session_t *ssn = pni_channel_state(transport, channel); - if (!ssn) { - return pn_do_error(transport, "amqp:not-allowed", "no such channel: %u", channel); - } - int err = pn_scan_error(args, &ssn->endpoint.remote_condition, SCAN_ERROR_DEFAULT); - if (err) return err; - PN_SET_REMOTE(ssn->endpoint.state, PN_REMOTE_CLOSED); - pn_collector_put(transport->connection->collector, PN_OBJECT, ssn, PN_SESSION_REMOTE_CLOSE); - pni_unmap_remote_channel(ssn); - return 0; -} - -int pn_do_close(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - pn_connection_t *conn = transport->connection; - int err = pn_scan_error(args, &transport->remote_condition, SCAN_ERROR_DEFAULT); - if (err) return err; - transport->close_rcvd = true; - PN_SET_REMOTE(conn->endpoint.state, PN_REMOTE_CLOSED); - pn_collector_put(transport->connection->collector, PN_OBJECT, conn, PN_CONNECTION_REMOTE_CLOSE); - return 0; -} - -// deprecated -ssize_t pn_transport_input(pn_transport_t *transport, const char *bytes, size_t available) -{ - if (!transport) return PN_ARG_ERR; - if (available == 0) { - return pn_transport_close_tail(transport); - } - const size_t original = available; - ssize_t capacity = pn_transport_capacity(transport); - if (capacity < 0) return capacity; - while (available && capacity) { - char *dest = pn_transport_tail(transport); - assert(dest); - size_t count = pn_min( (size_t)capacity, available ); - memmove( dest, bytes, count ); - available -= count; - bytes += count; - int rc = pn_transport_process( transport, count ); - if (rc < 0) return rc; - capacity = pn_transport_capacity(transport); - if (capacity < 0) return capacity; - } - - return original - available; -} - -// process pending input until none remaining or EOS -static ssize_t transport_consume(pn_transport_t *transport) -{ - // This allows whatever is driving the I/O to set the error - // condition on the transport before doing pn_transport_close_head() - // or pn_transport_close_tail(). This allows all transport errors to - // flow to the app the same way, but provides cleaner error messages - // since we don't try to look for a protocol header when, e.g. the - // connection was refused. - if (!transport->bytes_input && transport->tail_closed && - pn_condition_is_set(&transport->condition)) { - pn_do_error(transport, NULL, NULL); - return PN_EOS; - } - - size_t consumed = 0; - - while (transport->input_pending || transport->tail_closed) { - ssize_t n; - n = transport->io_layers[0]-> - process_input( transport, 0, - transport->input_buf + consumed, - transport->input_pending ); - if (n > 0) { - consumed += n; - transport->input_pending -= n; - } else if (n == 0) { - break; - } else { - assert(n == PN_EOS); - if (transport->trace & (PN_TRACE_RAW | PN_TRACE_FRM)) - pn_transport_log(transport, " <- EOS"); - transport->input_pending = 0; // XXX ??? - return n; - } - } - - if (transport->input_pending && consumed) { - memmove( transport->input_buf, &transport->input_buf[consumed], transport->input_pending ); - } - - return consumed; -} - -static int pni_process_conn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == CONNECTION) - { - if (!(endpoint->state & PN_LOCAL_UNINIT) && !transport->open_sent) - { - // as per the recommendation in the spec, advertise half our - // actual timeout to the remote - const pn_millis_t idle_timeout = transport->local_idle_timeout - ? (transport->local_idle_timeout/2) - : 0; - pn_connection_t *connection = (pn_connection_t *) endpoint; - const char *cid = pn_string_get(connection->container); - pni_calculate_channel_max(transport); - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[SS?I?H?InnCCC]", OPEN, - cid ? cid : "", - pn_string_get(connection->hostname), - // if not zero, advertise our max frame size and idle timeout - (bool)transport->local_max_frame, transport->local_max_frame, - (bool)transport->channel_max, transport->channel_max, - (bool)idle_timeout, idle_timeout, - connection->offered_capabilities, - connection->desired_capabilities, - connection->properties); - if (err) return err; - transport->open_sent = true; - } - } - - return 0; -} - -static uint16_t allocate_alias(pn_hash_t *aliases, uint32_t max_index, int * valid) -{ - for (uint32_t i = 0; i <= max_index; i++) { - if (!pn_hash_get(aliases, i)) { - * valid = 1; - return i; - } - } - - * valid = 0; - return 0; -} - -static size_t pni_session_outgoing_window(pn_session_t *ssn) -{ - return ssn->outgoing_window; -} - -static size_t pni_session_incoming_window(pn_session_t *ssn) -{ - uint32_t size = ssn->connection->transport->local_max_frame; - if (!size) { - return 2147483647; // biggest legal value - } else { - return (ssn->incoming_capacity - ssn->incoming_bytes)/size; - } -} - -static int pni_map_local_channel(pn_session_t *ssn) -{ - pn_transport_t *transport = ssn->connection->transport; - pn_session_state_t *state = &ssn->state; - int valid; - uint16_t channel = allocate_alias(transport->local_channels, transport->channel_max, & valid); - if (!valid) { - return 0; - } - state->local_channel = channel; - pn_hash_put(transport->local_channels, channel, ssn); - pn_ep_incref(&ssn->endpoint); - return 1; -} - -static int pni_process_ssn_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == SESSION && transport->open_sent) - { - pn_session_t *ssn = (pn_session_t *) endpoint; - pn_session_state_t *state = &ssn->state; - if (!(endpoint->state & PN_LOCAL_UNINIT) && state->local_channel == (uint16_t) -1) - { - if (! pni_map_local_channel(ssn)) { - pn_transport_logf(transport, "unable to find an open available channel within limit of %d", transport->channel_max ); - return PN_ERR; - } - state->incoming_window = pni_session_incoming_window(ssn); - state->outgoing_window = pni_session_outgoing_window(ssn); - pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?HIII]", BEGIN, - ((int16_t) state->remote_channel >= 0), state->remote_channel, - state->outgoing_transfer_count, - state->incoming_window, - state->outgoing_window); - } - } - - return 0; -} - -static const char *expiry_symbol(pn_expiry_policy_t policy) -{ - switch (policy) - { - case PN_EXPIRE_WITH_LINK: - return "link-detach"; - case PN_EXPIRE_WITH_SESSION: - return NULL; - case PN_EXPIRE_WITH_CONNECTION: - return "connection-close"; - case PN_EXPIRE_NEVER: - return "never"; - } - return NULL; -} - -static int pni_map_local_handle(pn_link_t *link) { - pn_link_state_t *state = &link->state; - pn_session_state_t *ssn_state = &link->session->state; - int valid; - // XXX TODO MICK: once changes are made to handle_max, change this hardcoded value to something reasonable. - state->local_handle = allocate_alias(ssn_state->local_handles, 65536, & valid); - if ( ! valid ) - return 0; - pn_hash_put(ssn_state->local_handles, state->local_handle, link); - pn_ep_incref(&link->endpoint); - return 1; -} - -static int pni_process_link_setup(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (transport->open_sent && (endpoint->type == SENDER || - endpoint->type == RECEIVER)) - { - pn_link_t *link = (pn_link_t *) endpoint; - pn_session_state_t *ssn_state = &link->session->state; - pn_link_state_t *state = &link->state; - if (((int16_t) ssn_state->local_channel >= 0) && - !(endpoint->state & PN_LOCAL_UNINIT) && state->local_handle == (uint32_t) -1) - { - pni_map_local_handle(link); - const pn_distribution_mode_t dist_mode = link->source.distribution_mode; - if (link->target.type == PN_COORDINATOR) { - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, - "DL[SIoBB?DL[SIsIoC?sCnCC]DL[C]nnI]", ATTACH, - pn_string_get(link->name), - state->local_handle, - endpoint->type == RECEIVER, - link->snd_settle_mode, - link->rcv_settle_mode, - (bool) link->source.type, SOURCE, - pn_string_get(link->source.address), - link->source.durability, - expiry_symbol(link->source.expiry_policy), - link->source.timeout, - link->source.dynamic, - link->source.properties, - (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode), - link->source.filter, - link->source.outcomes, - link->source.capabilities, - COORDINATOR, link->target.capabilities, - 0); - if (err) return err; - } else { - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, - "DL[SIoBB?DL[SIsIoC?sCnCC]?DL[SIsIoCC]nnI]", ATTACH, - pn_string_get(link->name), - state->local_handle, - endpoint->type == RECEIVER, - link->snd_settle_mode, - link->rcv_settle_mode, - (bool) link->source.type, SOURCE, - pn_string_get(link->source.address), - link->source.durability, - expiry_symbol(link->source.expiry_policy), - link->source.timeout, - link->source.dynamic, - link->source.properties, - (dist_mode != PN_DIST_MODE_UNSPECIFIED), dist_mode2symbol(dist_mode), - link->source.filter, - link->source.outcomes, - link->source.capabilities, - (bool) link->target.type, TARGET, - pn_string_get(link->target.address), - link->target.durability, - expiry_symbol(link->target.expiry_policy), - link->target.timeout, - link->target.dynamic, - link->target.properties, - link->target.capabilities, - 0); - if (err) return err; - } - } - } - - return 0; -} - -static int pni_post_flow(pn_transport_t *transport, pn_session_t *ssn, pn_link_t *link) -{ - ssn->state.incoming_window = pni_session_incoming_window(ssn); - ssn->state.outgoing_window = pni_session_outgoing_window(ssn); - bool linkq = (bool) link; - pn_link_state_t *state = &link->state; - return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, "DL[?IIII?I?I?In?o]", FLOW, - (int16_t) ssn->state.remote_channel >= 0, ssn->state.incoming_transfer_count, - ssn->state.incoming_window, - ssn->state.outgoing_transfer_count, - ssn->state.outgoing_window, - linkq, linkq ? state->local_handle : 0, - linkq, linkq ? state->delivery_count : 0, - linkq, linkq ? state->link_credit : 0, - linkq, linkq ? link->drain : false); -} - -static int pni_process_flow_receiver(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == RECEIVER && endpoint->state & PN_LOCAL_ACTIVE) - { - pn_link_t *rcv = (pn_link_t *) endpoint; - pn_session_t *ssn = rcv->session; - pn_link_state_t *state = &rcv->state; - if ((int16_t) ssn->state.local_channel >= 0 && - (int32_t) state->local_handle >= 0 && - ((rcv->drain || state->link_credit != rcv->credit - rcv->queued) || !ssn->state.incoming_window)) { - state->link_credit = rcv->credit - rcv->queued; - return pni_post_flow(transport, ssn, rcv); - } - } - - return 0; -} - -static int pni_flush_disp(pn_transport_t *transport, pn_session_t *ssn) -{ - uint64_t code = ssn->state.disp_code; - bool settled = ssn->state.disp_settled; - if (ssn->state.disp) { - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, "DL[oIIo?DL[]]", DISPOSITION, - ssn->state.disp_type, ssn->state.disp_first, ssn->state.disp_last, - settled, (bool)code, code); - if (err) return err; - ssn->state.disp_type = 0; - ssn->state.disp_code = 0; - ssn->state.disp_settled = 0; - ssn->state.disp_first = 0; - ssn->state.disp_last = 0; - ssn->state.disp = false; - } - return 0; -} - -static int pni_post_disp(pn_transport_t *transport, pn_delivery_t *delivery) -{ - pn_link_t *link = delivery->link; - pn_session_t *ssn = link->session; - pn_session_state_t *ssn_state = &ssn->state; - pn_modified(transport->connection, &link->session->endpoint, false); - pn_delivery_state_t *state = &delivery->state; - assert(state->init); - bool role = (link->endpoint.type == RECEIVER); - uint64_t code = delivery->local.type; - - if (!code && !delivery->local.settled) { - return 0; - } - - if (!pni_disposition_batchable(&delivery->local)) { - pn_data_clear(transport->disp_data); - PN_RETURN_IF_ERROR(pni_disposition_encode(&delivery->local, transport->disp_data)); - return pn_post_frame(transport, AMQP_FRAME_TYPE, ssn->state.local_channel, - "DL[oIIo?DLC]", DISPOSITION, - role, state->id, state->id, delivery->local.settled, - (bool)code, code, transport->disp_data); - } - - if (ssn_state->disp && code == ssn_state->disp_code && - delivery->local.settled == ssn_state->disp_settled && - ssn_state->disp_type == role) { - if (state->id == ssn_state->disp_first - 1) { - ssn_state->disp_first = state->id; - return 0; - } else if (state->id == ssn_state->disp_last + 1) { - ssn_state->disp_last = state->id; - return 0; - } - } - - if (ssn_state->disp) { - int err = pni_flush_disp(transport, ssn); - if (err) return err; - } - - ssn_state->disp_type = role; - ssn_state->disp_code = code; - ssn_state->disp_settled = delivery->local.settled; - ssn_state->disp_first = state->id; - ssn_state->disp_last = state->id; - ssn_state->disp = true; - - return 0; -} - -static int pni_process_tpwork_sender(pn_transport_t *transport, pn_delivery_t *delivery, bool *settle) -{ - *settle = false; - pn_link_t *link = delivery->link; - pn_session_state_t *ssn_state = &link->session->state; - pn_link_state_t *link_state = &link->state; - bool xfr_posted = false; - if ((int16_t) ssn_state->local_channel >= 0 && (int32_t) link_state->local_handle >= 0) { - pn_delivery_state_t *state = &delivery->state; - if (!state->sent && (delivery->done || pn_buffer_size(delivery->bytes) > 0) && - ssn_state->remote_incoming_window > 0 && link_state->link_credit > 0) { - if (!state->init) { - state = pni_delivery_map_push(&ssn_state->outgoing, delivery); - } - - pn_bytes_t bytes = pn_buffer_bytes(delivery->bytes); - size_t full_size = bytes.size; - pn_bytes_t tag = pn_buffer_bytes(delivery->tag); - pn_data_clear(transport->disp_data); - PN_RETURN_IF_ERROR(pni_disposition_encode(&delivery->local, transport->disp_data)); - int count = pni_post_amqp_transfer_frame(transport, - ssn_state->local_channel, - link_state->local_handle, - state->id, &bytes, &tag, - 0, // message-format - delivery->local.settled, - !delivery->done, - ssn_state->remote_incoming_window, - delivery->local.type, transport->disp_data); - if (count < 0) return count; - xfr_posted = true; - ssn_state->outgoing_transfer_count += count; - ssn_state->remote_incoming_window -= count; - - int sent = full_size - bytes.size; - pn_buffer_trim(delivery->bytes, sent, 0); - link->session->outgoing_bytes -= sent; - if (!pn_buffer_size(delivery->bytes) && delivery->done) { - state->sent = true; - link_state->delivery_count++; - link_state->link_credit--; - link->queued--; - link->session->outgoing_deliveries--; - } - - pn_collector_put(transport->connection->collector, PN_OBJECT, link, PN_LINK_FLOW); - } - } - - pn_delivery_state_t *state = delivery->state.init ? &delivery->state : NULL; - if ((int16_t) ssn_state->local_channel >= 0 && !delivery->remote.settled - && state && state->sent && !xfr_posted) { - int err = pni_post_disp(transport, delivery); - if (err) return err; - } - - *settle = delivery->local.settled && state && state->sent; - return 0; -} - -static int pni_process_tpwork_receiver(pn_transport_t *transport, pn_delivery_t *delivery, bool *settle) -{ - *settle = false; - pn_link_t *link = delivery->link; - // XXX: need to prevent duplicate disposition sending - pn_session_t *ssn = link->session; - if ((int16_t) ssn->state.local_channel >= 0 && !delivery->remote.settled && delivery->state.init) { - int err = pni_post_disp(transport, delivery); - if (err) return err; - } - - // XXX: need to centralize this policy and improve it - if (!ssn->state.incoming_window) { - int err = pni_post_flow(transport, ssn, link); - if (err) return err; - } - - *settle = delivery->local.settled; - return 0; -} - -static int pni_process_tpwork(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == CONNECTION && !transport->close_sent) - { - pn_connection_t *conn = (pn_connection_t *) endpoint; - pn_delivery_t *delivery = conn->tpwork_head; - while (delivery) - { - pn_delivery_t *tp_next = delivery->tpwork_next; - bool settle = false; - - pn_link_t *link = delivery->link; - pn_delivery_map_t *dm = NULL; - if (pn_link_is_sender(link)) { - dm = &link->session->state.outgoing; - int err = pni_process_tpwork_sender(transport, delivery, &settle); - if (err) return err; - } else { - dm = &link->session->state.incoming; - int err = pni_process_tpwork_receiver(transport, delivery, &settle); - if (err) return err; - } - - if (settle) { - pn_full_settle(dm, delivery); - } else if (!pn_delivery_buffered(delivery)) { - pn_clear_tpwork(delivery); - } - - delivery = tp_next; - } - } - - return 0; -} - -static int pni_process_flush_disp(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == SESSION) { - pn_session_t *session = (pn_session_t *) endpoint; - pn_session_state_t *state = &session->state; - if ((int16_t) state->local_channel >= 0 && !transport->close_sent) - { - int err = pni_flush_disp(transport, session); - if (err) return err; - } - } - - return 0; -} - -static int pni_process_flow_sender(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == SENDER && endpoint->state & PN_LOCAL_ACTIVE) - { - pn_link_t *snd = (pn_link_t *) endpoint; - pn_session_t *ssn = snd->session; - pn_link_state_t *state = &snd->state; - if ((int16_t) ssn->state.local_channel >= 0 && - (int32_t) state->local_handle >= 0 && - snd->drain && snd->drained) { - pn_delivery_t *tail = snd->unsettled_tail; - if (!tail || !pn_delivery_buffered(tail)) { - state->delivery_count += state->link_credit; - state->link_credit = 0; - snd->drained = 0; - return pni_post_flow(transport, ssn, snd); - } - } - } - - return 0; -} - -static void pni_unmap_local_handle(pn_link_t *link) { - pn_link_state_t *state = &link->state; - uintptr_t handle = state->local_handle; - state->local_handle = -2; - if (pn_hash_get(link->session->state.local_handles, handle)) { - pn_ep_decref(&link->endpoint); - } - // may delete link - pn_hash_del(link->session->state.local_handles, handle); -} - -static int pni_process_link_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == SENDER || endpoint->type == RECEIVER) - { - pn_link_t *link = (pn_link_t *) endpoint; - pn_session_t *session = link->session; - pn_session_state_t *ssn_state = &session->state; - pn_link_state_t *state = &link->state; - if (((endpoint->state & PN_LOCAL_CLOSED) || link->detached) && (int32_t) state->local_handle >= 0 && - (int16_t) ssn_state->local_channel >= 0 && !transport->close_sent) { - if (pn_link_is_sender(link) && pn_link_queued(link) && - (int32_t) state->remote_handle != -2 && - (int16_t) ssn_state->remote_channel != -2 && - !transport->close_rcvd) return 0; - - const char *name = NULL; - const char *description = NULL; - pn_data_t *info = NULL; - - if (pn_condition_is_set(&endpoint->condition)) { - name = pn_condition_get_name(&endpoint->condition); - description = pn_condition_get_description(&endpoint->condition); - info = pn_condition_info(&endpoint->condition); - } - - int err = - pn_post_frame(transport, AMQP_FRAME_TYPE, ssn_state->local_channel, - "DL[Io?DL[sSC]]", DETACH, state->local_handle, !link->detached, - (bool)name, ERROR, name, description, info); - if (err) return err; - pni_unmap_local_handle(link); - } - - pn_clear_modified(transport->connection, endpoint); - } - - return 0; -} - -static bool pni_pointful_buffering(pn_transport_t *transport, pn_session_t *session) -{ - if (transport->close_rcvd) return false; - if (!transport->open_rcvd) return true; - - pn_connection_t *conn = transport->connection; - pn_link_t *link = pn_link_head(conn, 0); - while (link) { - if (pn_link_is_sender(link) && pn_link_queued(link) > 0) { - pn_session_t *ssn = link->session; - if (session && session == ssn) { - if ((int32_t) link->state.remote_handle != -2 && - (int16_t) session->state.remote_channel != -2) { - return true; - } - } - } - link = pn_link_next(link, 0); - } - - return false; -} - -static void pni_unmap_local_channel(pn_session_t *ssn) { - // XXX: should really update link state also - pni_delivery_map_clear(&ssn->state.outgoing); - pni_transport_unbind_handles(ssn->state.local_handles, false); - pn_transport_t *transport = ssn->connection->transport; - pn_session_state_t *state = &ssn->state; - uintptr_t channel = state->local_channel; - state->local_channel = -2; - if (pn_hash_get(transport->local_channels, channel)) { - pn_ep_decref(&ssn->endpoint); - } - // may delete session - pn_hash_del(transport->local_channels, channel); -} - -static int pni_process_ssn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == SESSION) - { - pn_session_t *session = (pn_session_t *) endpoint; - pn_session_state_t *state = &session->state; - if (endpoint->state & PN_LOCAL_CLOSED && (int16_t) state->local_channel >= 0 - && !transport->close_sent) - { - if (pni_pointful_buffering(transport, session)) { - return 0; - } - - const char *name = NULL; - const char *description = NULL; - pn_data_t *info = NULL; - - if (pn_condition_is_set(&endpoint->condition)) { - name = pn_condition_get_name(&endpoint->condition); - description = pn_condition_get_description(&endpoint->condition); - info = pn_condition_info(&endpoint->condition); - } - - int err = pn_post_frame(transport, AMQP_FRAME_TYPE, state->local_channel, "DL[?DL[sSC]]", END, - (bool) name, ERROR, name, description, info); - if (err) return err; - pni_unmap_local_channel(session); - } - - pn_clear_modified(transport->connection, endpoint); - } - return 0; -} - -static int pni_process_conn_teardown(pn_transport_t *transport, pn_endpoint_t *endpoint) -{ - if (endpoint->type == CONNECTION) - { - if (endpoint->state & PN_LOCAL_CLOSED && !transport->close_sent) { - if (pni_pointful_buffering(transport, NULL)) return 0; - int err = pni_post_close(transport, NULL); - if (err) return err; - transport->close_sent = true; - } - - pn_clear_modified(transport->connection, endpoint); - } - return 0; -} - -static int pni_phase(pn_transport_t *transport, int (*phase)(pn_transport_t *, pn_endpoint_t *)) -{ - pn_connection_t *conn = transport->connection; - pn_endpoint_t *endpoint = conn->transport_head; - while (endpoint) - { - pn_endpoint_t *next = endpoint->transport_next; - int err = phase(transport, endpoint); - if (err) return err; - endpoint = next; - } - return 0; -} - -static int pni_process(pn_transport_t *transport) -{ - int err; - if ((err = pni_phase(transport, pni_process_conn_setup))) return err; - if ((err = pni_phase(transport, pni_process_ssn_setup))) return err; - if ((err = pni_phase(transport, pni_process_link_setup))) return err; - if ((err = pni_phase(transport, pni_process_flow_receiver))) return err; - - // XXX: this has to happen two times because we might settle stuff - // on the first pass and create space for more work to be done on the - // second pass - if ((err = pni_phase(transport, pni_process_tpwork))) return err; - if ((err = pni_phase(transport, pni_process_tpwork))) return err; - - if ((err = pni_phase(transport, pni_process_flush_disp))) return err; - - if ((err = pni_phase(transport, pni_process_flow_sender))) return err; - if ((err = pni_phase(transport, pni_process_link_teardown))) return err; - if ((err = pni_phase(transport, pni_process_ssn_teardown))) return err; - if ((err = pni_phase(transport, pni_process_conn_teardown))) return err; - - if (transport->connection->tpwork_head) { - pn_modified(transport->connection, &transport->connection->endpoint, false); - } - - return 0; -} - -#define AMQP_HEADER ("AMQP\x00\x01\x00\x00") - -static void pn_error_amqp(pn_transport_t* transport, unsigned int layer) -{ - if (!transport->close_sent) { - if (!transport->open_sent) { - pn_post_frame(transport, AMQP_FRAME_TYPE, 0, "DL[S]", OPEN, ""); - } - - pni_post_close(transport, &transport->condition); - transport->close_sent = true; - } - transport->halt = true; - transport->done_processing = true; -} - -static ssize_t pn_input_read_amqp_header(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available) -{ - bool eos = pn_transport_capacity(transport)==PN_EOS; - pni_protocol_type_t protocol = pni_sniff_header(bytes, available); - switch (protocol) { - case PNI_PROTOCOL_AMQP1: - if (transport->io_layers[layer] == &amqp_read_header_layer) { - transport->io_layers[layer] = &amqp_layer; - } else { - transport->io_layers[layer] = &amqp_write_header_layer; - } - if (transport->trace & PN_TRACE_FRM) - pn_transport_logf(transport, " <- %s", "AMQP"); - return 8; - case PNI_PROTOCOL_INSUFFICIENT: - if (!eos) return 0; - /* Fallthru */ - default: - break; - } - char quoted[1024]; - pn_quote_data(quoted, 1024, bytes, available); - pn_do_error(transport, "amqp:connection:framing-error", - "%s header mismatch: %s ['%s']%s", "AMQP", pni_protocol_name(protocol), quoted, - !eos ? "" : " (connection aborted)"); - return PN_EOS; -} - -static ssize_t pn_input_read_amqp(pn_transport_t* transport, unsigned int layer, const char* bytes, size_t available) -{ - if (transport->close_rcvd) { - if (available > 0) { - pn_do_error(transport, "amqp:connection:framing-error", "data after close"); - return PN_EOS; - } - } - - if (!transport->close_rcvd && !available) { - pn_do_error(transport, "amqp:connection:framing-error", "connection aborted"); - return PN_EOS; - } - - - ssize_t n = pn_dispatcher_input(transport, bytes, available, true, &transport->halt); - if (n < 0) { - //return pn_error_set(transport->error, n, "dispatch error"); - return PN_EOS; - } else if (transport->close_rcvd) { - return PN_EOS; - } else { - return n; - } -} - -/* process AMQP related timer events */ -static pn_timestamp_t pn_tick_amqp(pn_transport_t* transport, unsigned int layer, pn_timestamp_t now) -{ - pn_timestamp_t timeout = 0; - - if (transport->local_idle_timeout) { - if (transport->dead_remote_deadline == 0 || - transport->last_bytes_input != transport->bytes_input) { - transport->dead_remote_deadlin
<TRUNCATED> --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
