http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/dispatcher.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/dispatcher.c b/proton-c/src/core/dispatcher.c new file mode 100644 index 0000000..36f8cc9 --- /dev/null +++ b/proton-c/src/core/dispatcher.c @@ -0,0 +1,158 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "dispatcher.h" + +#include "framing.h" +#include "protocol.h" +#include "engine-internal.h" + +#include "dispatch_actions.h" + +int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { + pn_transport_logf(transport, "Error dispatching frame: type: %d: Unknown performative", frame_type); + return PN_ERR; +} + +int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { + pn_transport_logf(transport, "Error dispatching frame: Unknown frame type: %d", frame_type); + return PN_ERR; +} + +// We could use a table based approach here if we needed to dynamically +// add new performatives +static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) +{ + pn_action_t *action; + switch (frame_type) { + case AMQP_FRAME_TYPE: + /* Regular AMQP fames */ + switch (lcode) { + case OPEN: action = pn_do_open; break; + case BEGIN: action = pn_do_begin; break; + case ATTACH: action = pn_do_attach; break; + case FLOW: action = pn_do_flow; break; + case TRANSFER: action = pn_do_transfer; break; + case DISPOSITION: action = pn_do_disposition; break; + case DETACH: action = pn_do_detach; break; + case END: action = pn_do_end; break; + case CLOSE: action = pn_do_close; break; + default: action = pni_bad_frame; break; + }; + break; + case SASL_FRAME_TYPE: + /* SASL frames */ + switch (lcode) { + case SASL_MECHANISMS: action = pn_do_mechanisms; break; + case SASL_INIT: action = pn_do_init; break; + case SASL_CHALLENGE: action = pn_do_challenge; break; + case SASL_RESPONSE: action = pn_do_response; break; + case SASL_OUTCOME: action = pn_do_outcome; break; + default: action = pni_bad_frame; break; + }; + break; + default: action = pni_bad_frame_type; break; + }; + return action(transport, frame_type, channel, args, payload); +} + +static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, pn_frame_t frame) +{ + if (frame.size == 0) { // ignore null frames + if (transport->trace & PN_TRACE_FRM) + pn_transport_logf(transport, "%u <- (EMPTY FRAME)", frame.channel); + return 0; + } + + ssize_t dsize = pn_data_decode(args, frame.payload, frame.size); + if (dsize < 0) { + pn_string_format(transport->scratch, + "Error decoding frame: %s %s\n", pn_code(dsize), + pn_error_text(pn_data_error(args))); + pn_quote(transport->scratch, frame.payload, frame.size); + pn_transport_log(transport, pn_string_get(transport->scratch)); + return dsize; + } + + uint8_t frame_type = frame.type; + uint16_t channel = frame.channel; + // XXX: assuming numeric - + // if we get a symbol we should map it to the numeric value and dispatch on that + uint64_t lcode; + bool scanned; + int e = pn_data_scan(args, "D?L.", &scanned, &lcode); + if (e) { + pn_transport_log(transport, "Scan error"); + return e; + } + if (!scanned) { + pn_transport_log(transport, "Error dispatching frame"); + return PN_ERR; + } + size_t payload_size = frame.size - dsize; + const char *payload_mem = payload_size ? frame.payload + dsize : NULL; + pn_bytes_t payload = {payload_size, payload_mem}; + + pn_do_trace(transport, channel, IN, args, payload_mem, payload_size); + + int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, &payload); + + pn_data_clear(args); + + return err; +} + +ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t available, bool batch, bool *halt) +{ + size_t read = 0; + + while (available && !*halt) { + pn_frame_t frame; + + ssize_t n = pn_read_frame(&frame, bytes + read, available, transport->local_max_frame); + if (n > 0) { + read += n; + available -= n; + transport->input_frames_ct += 1; + int e = pni_dispatch_frame(transport, transport->args, frame); + if (e) return e; + } else if (n < 0) { + pn_do_error(transport, "amqp:connection:framing-error", "malformed frame"); + return n; + } else { + break; + } + + if (!batch) break; + } + + return read; +} + +ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size) +{ + int n = transport->available < size ? transport->available : size; + memmove(bytes, transport->output, n); + memmove(transport->output, transport->output + n, transport->available - n); + transport->available -= n; + // XXX: need to check for errors + return n; +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/dispatcher.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/dispatcher.h b/proton-c/src/core/dispatcher.h new file mode 100644 index 0000000..29881b5 --- /dev/null +++ b/proton-c/src/core/dispatcher.h @@ -0,0 +1,37 @@ +#ifndef _PROTON_DISPATCHER_H +#define _PROTON_DISPATCHER_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#ifndef __cplusplus +#include <stdbool.h> +#endif + +#include "proton/codec.h" +#include "proton/types.h" + +typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); + +ssize_t pn_dispatcher_input(pn_transport_t* transport, const char* bytes, size_t available, bool batch, bool* halt); +ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size); + +#endif /* dispatcher.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/encoder.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/encoder.c b/proton-c/src/core/encoder.c new file mode 100644 index 0000000..f8145fc --- /dev/null +++ b/proton-c/src/core/encoder.c @@ -0,0 +1,383 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/error.h> +#include <proton/object.h> +#include <proton/codec.h> +#include "encodings.h" +#include "encoder.h" + +#include <string.h> + +#include "data.h" + +struct pn_encoder_t { + char *output; + size_t size; + char *position; + pn_error_t *error; +}; + +static void pn_encoder_initialize(void *obj) +{ + pn_encoder_t *encoder = (pn_encoder_t *) obj; + encoder->output = NULL; + encoder->size = 0; + encoder->position = NULL; + encoder->error = pn_error(); +} + +static void pn_encoder_finalize(void *obj) { + pn_encoder_t *encoder = (pn_encoder_t *) obj; + pn_error_free(encoder->error); +} + +#define pn_encoder_hashcode NULL +#define pn_encoder_compare NULL +#define pn_encoder_inspect NULL + +pn_encoder_t *pn_encoder() +{ + static const pn_class_t clazz = PN_CLASS(pn_encoder); + return (pn_encoder_t *) pn_class_new(&clazz, sizeof(pn_encoder_t)); +} + +static uint8_t pn_type2code(pn_encoder_t *encoder, pn_type_t type) +{ + switch (type) + { + case PN_NULL: return PNE_NULL; + case PN_BOOL: return PNE_BOOLEAN; + case PN_UBYTE: return PNE_UBYTE; + case PN_BYTE: return PNE_BYTE; + case PN_USHORT: return PNE_USHORT; + case PN_SHORT: return PNE_SHORT; + case PN_UINT: return PNE_UINT; + case PN_INT: return PNE_INT; + case PN_CHAR: return PNE_UTF32; + case PN_FLOAT: return PNE_FLOAT; + case PN_LONG: return PNE_LONG; + case PN_TIMESTAMP: return PNE_MS64; + case PN_DOUBLE: return PNE_DOUBLE; + case PN_DECIMAL32: return PNE_DECIMAL32; + case PN_DECIMAL64: return PNE_DECIMAL64; + case PN_DECIMAL128: return PNE_DECIMAL128; + case PN_UUID: return PNE_UUID; + case PN_ULONG: return PNE_ULONG; + case PN_BINARY: return PNE_VBIN32; + case PN_STRING: return PNE_STR32_UTF8; + case PN_SYMBOL: return PNE_SYM32; + case PN_LIST: return PNE_LIST32; + case PN_ARRAY: return PNE_ARRAY32; + case PN_MAP: return PNE_MAP32; + case PN_DESCRIBED: return PNE_DESCRIPTOR; + default: + return pn_error_format(encoder->error, PN_ERR, "not a value type: %u\n", type); + } +} + +static uint8_t pn_node2code(pn_encoder_t *encoder, pni_node_t *node) +{ + switch (node->atom.type) { + case PN_LONG: + if (-128 <= node->atom.u.as_long && node->atom.u.as_long <= 127) { + return PNE_SMALLLONG; + } else { + return PNE_LONG; + } + case PN_INT: + if (-128 <= node->atom.u.as_int && node->atom.u.as_int <= 127) { + return PNE_SMALLINT; + } else { + return PNE_INT; + } + case PN_ULONG: + if (node->atom.u.as_ulong < 256) { + return PNE_SMALLULONG; + } else { + return PNE_ULONG; + } + case PN_UINT: + if (node->atom.u.as_uint < 256) { + return PNE_SMALLUINT; + } else { + return PNE_UINT; + } + case PN_BOOL: + if (node->atom.u.as_bool) { + return PNE_TRUE; + } else { + return PNE_FALSE; + } + case PN_STRING: + if (node->atom.u.as_bytes.size < 256) { + return PNE_STR8_UTF8; + } else { + return PNE_STR32_UTF8; + } + case PN_SYMBOL: + if (node->atom.u.as_bytes.size < 256) { + return PNE_SYM8; + } else { + return PNE_SYM32; + } + case PN_BINARY: + if (node->atom.u.as_bytes.size < 256) { + return PNE_VBIN8; + } else { + return PNE_VBIN32; + } + default: + return pn_type2code(encoder, node->atom.type); + } +} + +static size_t pn_encoder_remaining(pn_encoder_t *encoder) { + char * end = encoder->output + encoder->size; + if (end > encoder->position) + return end - encoder->position; + else + return 0; +} + +static inline void pn_encoder_writef8(pn_encoder_t *encoder, uint8_t value) +{ + if (pn_encoder_remaining(encoder)) { + encoder->position[0] = value; + } + encoder->position++; +} + +static inline void pn_encoder_writef16(pn_encoder_t *encoder, uint16_t value) +{ + if (pn_encoder_remaining(encoder) >= 2) { + encoder->position[0] = 0xFF & (value >> 8); + encoder->position[1] = 0xFF & (value ); + } + encoder->position += 2; +} + +static inline void pn_encoder_writef32(pn_encoder_t *encoder, uint32_t value) +{ + if (pn_encoder_remaining(encoder) >= 4) { + encoder->position[0] = 0xFF & (value >> 24); + encoder->position[1] = 0xFF & (value >> 16); + encoder->position[2] = 0xFF & (value >> 8); + encoder->position[3] = 0xFF & (value ); + } + encoder->position += 4; +} + +static inline void pn_encoder_writef64(pn_encoder_t *encoder, uint64_t value) { + if (pn_encoder_remaining(encoder) >= 8) { + encoder->position[0] = 0xFF & (value >> 56); + encoder->position[1] = 0xFF & (value >> 48); + encoder->position[2] = 0xFF & (value >> 40); + encoder->position[3] = 0xFF & (value >> 32); + encoder->position[4] = 0xFF & (value >> 24); + encoder->position[5] = 0xFF & (value >> 16); + encoder->position[6] = 0xFF & (value >> 8); + encoder->position[7] = 0xFF & (value ); + } + encoder->position += 8; +} + +static inline void pn_encoder_writef128(pn_encoder_t *encoder, char *value) { + if (pn_encoder_remaining(encoder) >= 16) { + memmove(encoder->position, value, 16); + } + encoder->position += 16; +} + +static inline void pn_encoder_writev8(pn_encoder_t *encoder, const pn_bytes_t *value) +{ + pn_encoder_writef8(encoder, value->size); + if (pn_encoder_remaining(encoder) >= value->size) + memmove(encoder->position, value->start, value->size); + encoder->position += value->size; +} + +static inline void pn_encoder_writev32(pn_encoder_t *encoder, const pn_bytes_t *value) +{ + pn_encoder_writef32(encoder, value->size); + if (pn_encoder_remaining(encoder) >= value->size) + memmove(encoder->position, value->start, value->size); + encoder->position += value->size; +} + +/* True if node is an element of an array - not the descriptor. */ +static bool pn_is_in_array(pn_data_t *data, pni_node_t *parent, pni_node_t *node) { + return (parent && parent->atom.type == PN_ARRAY) /* In array */ + && !(parent->described && !node->prev); /* Not the descriptor */ +} + +/** True if node is the first element of an array, not the descriptor. + *@pre pn_is_in_array(data, parent, node) + */ +static bool pn_is_first_in_array(pn_data_t *data, pni_node_t *parent, pni_node_t *node) { + if (!node->prev) return !parent->described; /* First node */ + return parent->described && (!pn_data_node(data, node->prev)->prev); +} + +typedef union { + uint32_t i; + uint32_t a[2]; + uint64_t l; + float f; + double d; +} conv_t; + +static int pni_encoder_enter(void *ctx, pn_data_t *data, pni_node_t *node) +{ + pn_encoder_t *encoder = (pn_encoder_t *) ctx; + pni_node_t *parent = pn_data_node(data, node->parent); + pn_atom_t *atom = &node->atom; + uint8_t code; + conv_t c; + + /** In an array we don't write the code before each element, only the first. */ + if (pn_is_in_array(data, parent, node)) { + code = pn_type2code(encoder, parent->type); + if (pn_is_first_in_array(data, parent, node)) { + pn_encoder_writef8(encoder, code); + } + } else { + code = pn_node2code(encoder, node); + pn_encoder_writef8(encoder, code); + } + + switch (code) { + case PNE_DESCRIPTOR: + case PNE_NULL: + case PNE_TRUE: + case PNE_FALSE: return 0; + case PNE_BOOLEAN: pn_encoder_writef8(encoder, atom->u.as_bool); return 0; + case PNE_UBYTE: pn_encoder_writef8(encoder, atom->u.as_ubyte); return 0; + case PNE_BYTE: pn_encoder_writef8(encoder, atom->u.as_byte); return 0; + case PNE_USHORT: pn_encoder_writef16(encoder, atom->u.as_ushort); return 0; + case PNE_SHORT: pn_encoder_writef16(encoder, atom->u.as_short); return 0; + case PNE_UINT0: return 0; + case PNE_SMALLUINT: pn_encoder_writef8(encoder, atom->u.as_uint); return 0; + case PNE_UINT: pn_encoder_writef32(encoder, atom->u.as_uint); return 0; + case PNE_SMALLINT: pn_encoder_writef8(encoder, atom->u.as_int); return 0; + case PNE_INT: pn_encoder_writef32(encoder, atom->u.as_int); return 0; + case PNE_UTF32: pn_encoder_writef32(encoder, atom->u.as_char); return 0; + case PNE_ULONG: pn_encoder_writef64(encoder, atom->u.as_ulong); return 0; + case PNE_SMALLULONG: pn_encoder_writef8(encoder, atom->u.as_ulong); return 0; + case PNE_LONG: pn_encoder_writef64(encoder, atom->u.as_long); return 0; + case PNE_SMALLLONG: pn_encoder_writef8(encoder, atom->u.as_long); return 0; + case PNE_MS64: pn_encoder_writef64(encoder, atom->u.as_timestamp); return 0; + case PNE_FLOAT: c.f = atom->u.as_float; pn_encoder_writef32(encoder, c.i); return 0; + case PNE_DOUBLE: c.d = atom->u.as_double; pn_encoder_writef64(encoder, c.l); return 0; + case PNE_DECIMAL32: pn_encoder_writef32(encoder, atom->u.as_decimal32); return 0; + case PNE_DECIMAL64: pn_encoder_writef64(encoder, atom->u.as_decimal64); return 0; + case PNE_DECIMAL128: pn_encoder_writef128(encoder, atom->u.as_decimal128.bytes); return 0; + case PNE_UUID: pn_encoder_writef128(encoder, atom->u.as_uuid.bytes); return 0; + case PNE_VBIN8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0; + case PNE_VBIN32: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0; + case PNE_STR8_UTF8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0; + case PNE_STR32_UTF8: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0; + case PNE_SYM8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0; + case PNE_SYM32: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0; + case PNE_ARRAY32: + node->start = encoder->position; + node->small = false; + // we'll backfill the size on exit + encoder->position += 4; + pn_encoder_writef32(encoder, node->described ? node->children - 1 : node->children); + if (node->described) + pn_encoder_writef8(encoder, 0); + return 0; + case PNE_LIST32: + case PNE_MAP32: + node->start = encoder->position; + node->small = false; + // we'll backfill the size later + encoder->position += 4; + pn_encoder_writef32(encoder, node->children); + return 0; + default: + return pn_error_format(data->error, PN_ERR, "unrecognized encoding: %u", code); + } +} + +#include <stdio.h> + +static int pni_encoder_exit(void *ctx, pn_data_t *data, pni_node_t *node) +{ + pn_encoder_t *encoder = (pn_encoder_t *) ctx; + char *pos; + + switch (node->atom.type) { + case PN_ARRAY: + if ((node->described && node->children == 1) || (!node->described && node->children == 0)) { + pn_encoder_writef8(encoder, pn_type2code(encoder, node->type)); + } + // Fallthrough + case PN_LIST: + case PN_MAP: + pos = encoder->position; + encoder->position = node->start; + if (node->small) { + // backfill size + size_t size = pos - node->start - 1; + pn_encoder_writef8(encoder, size); + } else { + // backfill size + size_t size = pos - node->start - 4; + pn_encoder_writef32(encoder, size); + } + encoder->position = pos; + return 0; + default: + return 0; + } +} + +ssize_t pn_encoder_encode(pn_encoder_t *encoder, pn_data_t *src, char *dst, size_t size) +{ + encoder->output = dst; + encoder->position = dst; + encoder->size = size; + + int err = pni_data_traverse(src, pni_encoder_enter, pni_encoder_exit, encoder); + if (err) return err; + size_t encoded = encoder->position - encoder->output; + if (encoded > size) { + pn_error_format(pn_data_error(src), PN_OVERFLOW, "not enough space to encode"); + return PN_OVERFLOW; + } + return (ssize_t)encoded; +} + +ssize_t pn_encoder_size(pn_encoder_t *encoder, pn_data_t *src) +{ + encoder->output = 0; + encoder->position = 0; + encoder->size = 0; + + pn_handle_t save = pn_data_point(src); + int err = pni_data_traverse(src, pni_encoder_enter, pni_encoder_exit, encoder); + pn_data_restore(src, save); + + if (err) return err; + return encoder->position - encoder->output; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/encoder.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/encoder.h b/proton-c/src/core/encoder.h new file mode 100644 index 0000000..20876cb --- /dev/null +++ b/proton-c/src/core/encoder.h @@ -0,0 +1,31 @@ +#ifndef _PROTON_ENCODER_H +#define _PROTON_ENCODER_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +typedef struct pn_encoder_t pn_encoder_t; + +pn_encoder_t *pn_encoder(void); +ssize_t pn_encoder_encode(pn_encoder_t *encoder, pn_data_t *src, char *dst, size_t size); +ssize_t pn_encoder_size(pn_encoder_t *encoder, pn_data_t *src); + +#endif /* encoder.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/a5850716/proton-c/src/core/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/engine-internal.h b/proton-c/src/core/engine-internal.h new file mode 100644 index 0000000..fdaf272 --- /dev/null +++ b/proton-c/src/core/engine-internal.h @@ -0,0 +1,375 @@ +#ifndef _PROTON_ENGINE_INTERNAL_H +#define _PROTON_ENGINE_INTERNAL_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <proton/object.h> +#include <proton/engine.h> +#include <proton/types.h> + +#include "buffer.h" +#include "dispatcher.h" +#include "util.h" + +typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpoint_type_t; + +typedef struct pn_endpoint_t pn_endpoint_t; + +struct pn_condition_t { + pn_string_t *name; + pn_string_t *description; + pn_data_t *info; +}; + +struct pn_endpoint_t { + pn_endpoint_type_t type; + pn_state_t state; + pn_error_t *error; + pn_condition_t condition; + pn_condition_t remote_condition; + pn_endpoint_t *endpoint_next; + pn_endpoint_t *endpoint_prev; + pn_endpoint_t *transport_next; + pn_endpoint_t *transport_prev; + int refcount; // when this hits zero we generate a final event + bool modified; + bool freed; + bool referenced; +}; + +typedef struct { + pn_sequence_t id; + bool sent; + bool init; +} pn_delivery_state_t; + +typedef struct { + pn_sequence_t next; + pn_hash_t *deliveries; +} pn_delivery_map_t; + +typedef struct { + // XXX: stop using negative numbers + uint32_t local_handle; + uint32_t remote_handle; + pn_sequence_t delivery_count; + pn_sequence_t link_credit; +} pn_link_state_t; + +typedef struct { + // XXX: stop using negative numbers + uint16_t local_channel; + uint16_t remote_channel; + bool incoming_init; + pn_delivery_map_t incoming; + pn_delivery_map_t outgoing; + pn_sequence_t incoming_transfer_count; + pn_sequence_t incoming_window; + pn_sequence_t remote_incoming_window; + pn_sequence_t outgoing_transfer_count; + pn_sequence_t outgoing_window; + pn_hash_t *local_handles; + pn_hash_t *remote_handles; + + uint64_t disp_code; + bool disp_settled; + bool disp_type; + pn_sequence_t disp_first; + pn_sequence_t disp_last; + bool disp; +} pn_session_state_t; + +typedef struct pn_io_layer_t { + ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char *, size_t); + ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *, size_t); + void (*handle_error)(struct pn_transport_t* transport, unsigned int layer); + pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t); + size_t (*buffered_output)(struct pn_transport_t *transport); // how much output is held +} pn_io_layer_t; + +extern const pn_io_layer_t pni_passthru_layer; +extern const pn_io_layer_t ssl_layer; +extern const pn_io_layer_t sasl_header_layer; +extern const pn_io_layer_t sasl_write_header_layer; + +// Bit flag defines for the protocol layers +typedef uint8_t pn_io_layer_flags_t; +#define LAYER_NONE 0 +#define LAYER_AMQP1 1 +#define LAYER_AMQPSASL 2 +#define LAYER_AMQPSSL 4 +#define LAYER_SSL 8 + +typedef struct pni_sasl_t pni_sasl_t; +typedef struct pni_ssl_t pni_ssl_t; + +struct pn_transport_t { + pn_tracer_t tracer; + pni_sasl_t *sasl; + pni_ssl_t *ssl; + pn_connection_t *connection; // reference counted + char *remote_container; + char *remote_hostname; + pn_data_t *remote_offered_capabilities; + pn_data_t *remote_desired_capabilities; + pn_data_t *remote_properties; + pn_data_t *disp_data; + //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024) +#define PN_DEFAULT_MAX_FRAME_SIZE (0) /* for now, allow unlimited size */ + uint32_t local_max_frame; + uint32_t remote_max_frame; + pn_condition_t remote_condition; + pn_condition_t condition; + pn_error_t *error; + +#define PN_IO_LAYER_CT 3 + const pn_io_layer_t *io_layers[PN_IO_LAYER_CT]; + + /* dead remote detection */ + pn_millis_t local_idle_timeout; + pn_millis_t remote_idle_timeout; + pn_timestamp_t dead_remote_deadline; + uint64_t last_bytes_input; + + /* keepalive */ + pn_timestamp_t keepalive_deadline; + uint64_t last_bytes_output; + + pn_hash_t *local_channels; + pn_hash_t *remote_channels; + + + /* scratch area */ + pn_string_t *scratch; + pn_data_t *args; + pn_data_t *output_args; + pn_buffer_t *frame; // frame under construction + // Temporary + size_t capacity; + size_t available; /* number of raw bytes pending output */ + char *output; + + /* statistics */ + uint64_t bytes_input; + uint64_t bytes_output; + uint64_t output_frames_ct; + uint64_t input_frames_ct; + + /* output buffered for send */ + size_t output_size; + size_t output_pending; + char *output_buf; + + /* input from peer */ + size_t input_size; + size_t input_pending; + char *input_buf; + + pn_record_t *context; + + pn_trace_t trace; + + /* + * The maximum channel number can be constrained in several ways: + * 1. an unchangeable limit imposed by this library code + * 2. a limit imposed by the remote peer when the connection is opened, + * which this app must honor + * 3. a limit imposed by this app, which may be raised and lowered + * until the OPEN frame is sent. + * These constraints are all summed up in channel_max, below. + */ + #define PN_IMPL_CHANNEL_MAX 32767 + uint16_t local_channel_max; + uint16_t remote_channel_max; + uint16_t channel_max; + + pn_io_layer_flags_t allowed_layers; + pn_io_layer_flags_t present_layers; + + bool freed; + bool open_sent; + bool open_rcvd; + bool close_sent; + bool close_rcvd; + bool tail_closed; // input stream closed by driver + bool head_closed; + bool done_processing; // if true, don't call pn_process again + bool posted_idle_timeout; + bool server; + bool halt; + bool auth_required; + bool authenticated; + bool encryption_required; + + bool referenced; +}; + +struct pn_connection_t { + pn_endpoint_t endpoint; + pn_endpoint_t *endpoint_head; + pn_endpoint_t *endpoint_tail; + pn_endpoint_t *transport_head; // reference counted + pn_endpoint_t *transport_tail; + pn_list_t *sessions; + pn_list_t *freed; + pn_transport_t *transport; + pn_delivery_t *work_head; + pn_delivery_t *work_tail; + pn_delivery_t *tpwork_head; // reference counted + pn_delivery_t *tpwork_tail; + pn_string_t *container; + pn_string_t *hostname; + pn_string_t *auth_user; + pn_string_t *auth_password; + pn_data_t *offered_capabilities; + pn_data_t *desired_capabilities; + pn_data_t *properties; + pn_collector_t *collector; + pn_record_t *context; + pn_list_t *delivery_pool; +}; + +struct pn_session_t { + pn_endpoint_t endpoint; + pn_connection_t *connection; // reference counted + pn_list_t *links; + pn_list_t *freed; + pn_record_t *context; + size_t incoming_capacity; + pn_sequence_t incoming_bytes; + pn_sequence_t outgoing_bytes; + pn_sequence_t incoming_deliveries; + pn_sequence_t outgoing_deliveries; + pn_sequence_t outgoing_window; + pn_session_state_t state; +}; + +struct pn_terminus_t { + pn_string_t *address; + pn_data_t *properties; + pn_data_t *capabilities; + pn_data_t *outcomes; + pn_data_t *filter; + pn_durability_t durability; + pn_expiry_policy_t expiry_policy; + pn_seconds_t timeout; + pn_terminus_type_t type; + pn_distribution_mode_t distribution_mode; + bool dynamic; +}; + +struct pn_link_t { + pn_endpoint_t endpoint; + pn_terminus_t source; + pn_terminus_t target; + pn_terminus_t remote_source; + pn_terminus_t remote_target; + pn_link_state_t state; + pn_string_t *name; + pn_session_t *session; // reference counted + pn_delivery_t *unsettled_head; + pn_delivery_t *unsettled_tail; + pn_delivery_t *current; + pn_record_t *context; + size_t unsettled_count; + pn_sequence_t available; + pn_sequence_t credit; + pn_sequence_t queued; + int drained; // number of drained credits + uint8_t snd_settle_mode; + uint8_t rcv_settle_mode; + uint8_t remote_snd_settle_mode; + uint8_t remote_rcv_settle_mode; + bool drain_flag_mode; // receiver only + bool drain; + bool detached; +}; + +struct pn_disposition_t { + pn_condition_t condition; + uint64_t type; + pn_data_t *data; + pn_data_t *annotations; + uint64_t section_offset; + uint32_t section_number; + bool failed; + bool undeliverable; + bool settled; +}; + +struct pn_delivery_t { + pn_disposition_t local; + pn_disposition_t remote; + pn_link_t *link; // reference counted + pn_buffer_t *tag; + pn_delivery_t *unsettled_next; + pn_delivery_t *unsettled_prev; + pn_delivery_t *work_next; + pn_delivery_t *work_prev; + pn_delivery_t *tpwork_next; + pn_delivery_t *tpwork_prev; + pn_delivery_state_t state; + pn_buffer_t *bytes; + pn_record_t *context; + bool updated; + bool settled; // tracks whether we're in the unsettled list or not + bool work; + bool tpwork; + bool done; + bool referenced; +}; + +#define PN_SET_LOCAL(OLD, NEW) \ + (OLD) = ((OLD) & PN_REMOTE_MASK) | (NEW) + +#define PN_SET_REMOTE(OLD, NEW) \ + (OLD) = ((OLD) & PN_LOCAL_MASK) | (NEW) + +void pn_link_dump(pn_link_t *link); + +void pn_dump(pn_connection_t *conn); +void pn_transport_sasl_init(pn_transport_t *transport); + +void pn_condition_init(pn_condition_t *condition); +void pn_condition_tini(pn_condition_t *condition); +void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit); +void pn_real_settle(pn_delivery_t *delivery); // will free delivery if link is freed +void pn_clear_tpwork(pn_delivery_t *delivery); +void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery); +void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint); +void pn_connection_bound(pn_connection_t *conn); +void pn_connection_unbound(pn_connection_t *conn); +int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...); +void pn_set_error_layer(pn_transport_t *transport); +void pn_session_unbound(pn_session_t* ssn); +void pn_link_unbound(pn_link_t* link); +void pn_ep_incref(pn_endpoint_t *endpoint); +void pn_ep_decref(pn_endpoint_t *endpoint); + +int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...); + +typedef enum {IN, OUT} pn_dir_t; + +void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir, + pn_data_t *args, const char *payload, size_t size); + +#endif /* engine-internal.h */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
