http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/dispatcher.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/dispatcher.c b/proton-c/src/core/dispatcher.c deleted file mode 100644 index 36f8cc9..0000000 --- a/proton-c/src/core/dispatcher.c +++ /dev/null @@ -1,158 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "dispatcher.h" - -#include "framing.h" -#include "protocol.h" -#include "engine-internal.h" - -#include "dispatch_actions.h" - -int pni_bad_frame(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_logf(transport, "Error dispatching frame: type: %d: Unknown performative", frame_type); - return PN_ERR; -} - -int pni_bad_frame_type(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) { - pn_transport_logf(transport, "Error dispatching frame: Unknown frame type: %d", frame_type); - return PN_ERR; -} - -// We could use a table based approach here if we needed to dynamically -// add new performatives -static inline int pni_dispatch_action(pn_transport_t* transport, uint64_t lcode, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload) -{ - pn_action_t *action; - switch (frame_type) { - case AMQP_FRAME_TYPE: - /* Regular AMQP fames */ - switch (lcode) { - case OPEN: action = pn_do_open; break; - case BEGIN: action = pn_do_begin; break; - case ATTACH: action = pn_do_attach; break; - case FLOW: action = pn_do_flow; break; - case TRANSFER: action = pn_do_transfer; break; - case DISPOSITION: action = pn_do_disposition; break; - case DETACH: action = pn_do_detach; break; - case END: action = pn_do_end; break; - case CLOSE: action = pn_do_close; break; - default: action = pni_bad_frame; break; - }; - break; - case SASL_FRAME_TYPE: - /* SASL frames */ - switch (lcode) { - case SASL_MECHANISMS: action = pn_do_mechanisms; break; - case SASL_INIT: action = pn_do_init; break; - case SASL_CHALLENGE: action = pn_do_challenge; break; - case SASL_RESPONSE: action = pn_do_response; break; - case SASL_OUTCOME: action = pn_do_outcome; break; - default: action = pni_bad_frame; break; - }; - break; - default: action = pni_bad_frame_type; break; - }; - return action(transport, frame_type, channel, args, payload); -} - -static int pni_dispatch_frame(pn_transport_t * transport, pn_data_t *args, pn_frame_t frame) -{ - if (frame.size == 0) { // ignore null frames - if (transport->trace & PN_TRACE_FRM) - pn_transport_logf(transport, "%u <- (EMPTY FRAME)", frame.channel); - return 0; - } - - ssize_t dsize = pn_data_decode(args, frame.payload, frame.size); - if (dsize < 0) { - pn_string_format(transport->scratch, - "Error decoding frame: %s %s\n", pn_code(dsize), - pn_error_text(pn_data_error(args))); - pn_quote(transport->scratch, frame.payload, frame.size); - pn_transport_log(transport, pn_string_get(transport->scratch)); - return dsize; - } - - uint8_t frame_type = frame.type; - uint16_t channel = frame.channel; - // XXX: assuming numeric - - // if we get a symbol we should map it to the numeric value and dispatch on that - uint64_t lcode; - bool scanned; - int e = pn_data_scan(args, "D?L.", &scanned, &lcode); - if (e) { - pn_transport_log(transport, "Scan error"); - return e; - } - if (!scanned) { - pn_transport_log(transport, "Error dispatching frame"); - return PN_ERR; - } - size_t payload_size = frame.size - dsize; - const char *payload_mem = payload_size ? frame.payload + dsize : NULL; - pn_bytes_t payload = {payload_size, payload_mem}; - - pn_do_trace(transport, channel, IN, args, payload_mem, payload_size); - - int err = pni_dispatch_action(transport, lcode, frame_type, channel, args, &payload); - - pn_data_clear(args); - - return err; -} - -ssize_t pn_dispatcher_input(pn_transport_t *transport, const char *bytes, size_t available, bool batch, bool *halt) -{ - size_t read = 0; - - while (available && !*halt) { - pn_frame_t frame; - - ssize_t n = pn_read_frame(&frame, bytes + read, available, transport->local_max_frame); - if (n > 0) { - read += n; - available -= n; - transport->input_frames_ct += 1; - int e = pni_dispatch_frame(transport, transport->args, frame); - if (e) return e; - } else if (n < 0) { - pn_do_error(transport, "amqp:connection:framing-error", "malformed frame"); - return n; - } else { - break; - } - - if (!batch) break; - } - - return read; -} - -ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size) -{ - int n = transport->available < size ? transport->available : size; - memmove(bytes, transport->output, n); - memmove(transport->output, transport->output + n, transport->available - n); - transport->available -= n; - // XXX: need to check for errors - return n; -}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/dispatcher.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/dispatcher.h b/proton-c/src/core/dispatcher.h deleted file mode 100644 index 29881b5..0000000 --- a/proton-c/src/core/dispatcher.h +++ /dev/null @@ -1,37 +0,0 @@ -#ifndef _PROTON_DISPATCHER_H -#define _PROTON_DISPATCHER_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#ifndef __cplusplus -#include <stdbool.h> -#endif - -#include "proton/codec.h" -#include "proton/types.h" - -typedef int (pn_action_t)(pn_transport_t *transport, uint8_t frame_type, uint16_t channel, pn_data_t *args, const pn_bytes_t *payload); - -ssize_t pn_dispatcher_input(pn_transport_t* transport, const char* bytes, size_t available, bool batch, bool* halt); -ssize_t pn_dispatcher_output(pn_transport_t *transport, char *bytes, size_t size); - -#endif /* dispatcher.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/encoder.c ---------------------------------------------------------------------- diff --git a/proton-c/src/core/encoder.c b/proton-c/src/core/encoder.c deleted file mode 100644 index f8145fc..0000000 --- a/proton-c/src/core/encoder.c +++ /dev/null @@ -1,383 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/error.h> -#include <proton/object.h> -#include <proton/codec.h> -#include "encodings.h" -#include "encoder.h" - -#include <string.h> - -#include "data.h" - -struct pn_encoder_t { - char *output; - size_t size; - char *position; - pn_error_t *error; -}; - -static void pn_encoder_initialize(void *obj) -{ - pn_encoder_t *encoder = (pn_encoder_t *) obj; - encoder->output = NULL; - encoder->size = 0; - encoder->position = NULL; - encoder->error = pn_error(); -} - -static void pn_encoder_finalize(void *obj) { - pn_encoder_t *encoder = (pn_encoder_t *) obj; - pn_error_free(encoder->error); -} - -#define pn_encoder_hashcode NULL -#define pn_encoder_compare NULL -#define pn_encoder_inspect NULL - -pn_encoder_t *pn_encoder() -{ - static const pn_class_t clazz = PN_CLASS(pn_encoder); - return (pn_encoder_t *) pn_class_new(&clazz, sizeof(pn_encoder_t)); -} - -static uint8_t pn_type2code(pn_encoder_t *encoder, pn_type_t type) -{ - switch (type) - { - case PN_NULL: return PNE_NULL; - case PN_BOOL: return PNE_BOOLEAN; - case PN_UBYTE: return PNE_UBYTE; - case PN_BYTE: return PNE_BYTE; - case PN_USHORT: return PNE_USHORT; - case PN_SHORT: return PNE_SHORT; - case PN_UINT: return PNE_UINT; - case PN_INT: return PNE_INT; - case PN_CHAR: return PNE_UTF32; - case PN_FLOAT: return PNE_FLOAT; - case PN_LONG: return PNE_LONG; - case PN_TIMESTAMP: return PNE_MS64; - case PN_DOUBLE: return PNE_DOUBLE; - case PN_DECIMAL32: return PNE_DECIMAL32; - case PN_DECIMAL64: return PNE_DECIMAL64; - case PN_DECIMAL128: return PNE_DECIMAL128; - case PN_UUID: return PNE_UUID; - case PN_ULONG: return PNE_ULONG; - case PN_BINARY: return PNE_VBIN32; - case PN_STRING: return PNE_STR32_UTF8; - case PN_SYMBOL: return PNE_SYM32; - case PN_LIST: return PNE_LIST32; - case PN_ARRAY: return PNE_ARRAY32; - case PN_MAP: return PNE_MAP32; - case PN_DESCRIBED: return PNE_DESCRIPTOR; - default: - return pn_error_format(encoder->error, PN_ERR, "not a value type: %u\n", type); - } -} - -static uint8_t pn_node2code(pn_encoder_t *encoder, pni_node_t *node) -{ - switch (node->atom.type) { - case PN_LONG: - if (-128 <= node->atom.u.as_long && node->atom.u.as_long <= 127) { - return PNE_SMALLLONG; - } else { - return PNE_LONG; - } - case PN_INT: - if (-128 <= node->atom.u.as_int && node->atom.u.as_int <= 127) { - return PNE_SMALLINT; - } else { - return PNE_INT; - } - case PN_ULONG: - if (node->atom.u.as_ulong < 256) { - return PNE_SMALLULONG; - } else { - return PNE_ULONG; - } - case PN_UINT: - if (node->atom.u.as_uint < 256) { - return PNE_SMALLUINT; - } else { - return PNE_UINT; - } - case PN_BOOL: - if (node->atom.u.as_bool) { - return PNE_TRUE; - } else { - return PNE_FALSE; - } - case PN_STRING: - if (node->atom.u.as_bytes.size < 256) { - return PNE_STR8_UTF8; - } else { - return PNE_STR32_UTF8; - } - case PN_SYMBOL: - if (node->atom.u.as_bytes.size < 256) { - return PNE_SYM8; - } else { - return PNE_SYM32; - } - case PN_BINARY: - if (node->atom.u.as_bytes.size < 256) { - return PNE_VBIN8; - } else { - return PNE_VBIN32; - } - default: - return pn_type2code(encoder, node->atom.type); - } -} - -static size_t pn_encoder_remaining(pn_encoder_t *encoder) { - char * end = encoder->output + encoder->size; - if (end > encoder->position) - return end - encoder->position; - else - return 0; -} - -static inline void pn_encoder_writef8(pn_encoder_t *encoder, uint8_t value) -{ - if (pn_encoder_remaining(encoder)) { - encoder->position[0] = value; - } - encoder->position++; -} - -static inline void pn_encoder_writef16(pn_encoder_t *encoder, uint16_t value) -{ - if (pn_encoder_remaining(encoder) >= 2) { - encoder->position[0] = 0xFF & (value >> 8); - encoder->position[1] = 0xFF & (value ); - } - encoder->position += 2; -} - -static inline void pn_encoder_writef32(pn_encoder_t *encoder, uint32_t value) -{ - if (pn_encoder_remaining(encoder) >= 4) { - encoder->position[0] = 0xFF & (value >> 24); - encoder->position[1] = 0xFF & (value >> 16); - encoder->position[2] = 0xFF & (value >> 8); - encoder->position[3] = 0xFF & (value ); - } - encoder->position += 4; -} - -static inline void pn_encoder_writef64(pn_encoder_t *encoder, uint64_t value) { - if (pn_encoder_remaining(encoder) >= 8) { - encoder->position[0] = 0xFF & (value >> 56); - encoder->position[1] = 0xFF & (value >> 48); - encoder->position[2] = 0xFF & (value >> 40); - encoder->position[3] = 0xFF & (value >> 32); - encoder->position[4] = 0xFF & (value >> 24); - encoder->position[5] = 0xFF & (value >> 16); - encoder->position[6] = 0xFF & (value >> 8); - encoder->position[7] = 0xFF & (value ); - } - encoder->position += 8; -} - -static inline void pn_encoder_writef128(pn_encoder_t *encoder, char *value) { - if (pn_encoder_remaining(encoder) >= 16) { - memmove(encoder->position, value, 16); - } - encoder->position += 16; -} - -static inline void pn_encoder_writev8(pn_encoder_t *encoder, const pn_bytes_t *value) -{ - pn_encoder_writef8(encoder, value->size); - if (pn_encoder_remaining(encoder) >= value->size) - memmove(encoder->position, value->start, value->size); - encoder->position += value->size; -} - -static inline void pn_encoder_writev32(pn_encoder_t *encoder, const pn_bytes_t *value) -{ - pn_encoder_writef32(encoder, value->size); - if (pn_encoder_remaining(encoder) >= value->size) - memmove(encoder->position, value->start, value->size); - encoder->position += value->size; -} - -/* True if node is an element of an array - not the descriptor. */ -static bool pn_is_in_array(pn_data_t *data, pni_node_t *parent, pni_node_t *node) { - return (parent && parent->atom.type == PN_ARRAY) /* In array */ - && !(parent->described && !node->prev); /* Not the descriptor */ -} - -/** True if node is the first element of an array, not the descriptor. - *@pre pn_is_in_array(data, parent, node) - */ -static bool pn_is_first_in_array(pn_data_t *data, pni_node_t *parent, pni_node_t *node) { - if (!node->prev) return !parent->described; /* First node */ - return parent->described && (!pn_data_node(data, node->prev)->prev); -} - -typedef union { - uint32_t i; - uint32_t a[2]; - uint64_t l; - float f; - double d; -} conv_t; - -static int pni_encoder_enter(void *ctx, pn_data_t *data, pni_node_t *node) -{ - pn_encoder_t *encoder = (pn_encoder_t *) ctx; - pni_node_t *parent = pn_data_node(data, node->parent); - pn_atom_t *atom = &node->atom; - uint8_t code; - conv_t c; - - /** In an array we don't write the code before each element, only the first. */ - if (pn_is_in_array(data, parent, node)) { - code = pn_type2code(encoder, parent->type); - if (pn_is_first_in_array(data, parent, node)) { - pn_encoder_writef8(encoder, code); - } - } else { - code = pn_node2code(encoder, node); - pn_encoder_writef8(encoder, code); - } - - switch (code) { - case PNE_DESCRIPTOR: - case PNE_NULL: - case PNE_TRUE: - case PNE_FALSE: return 0; - case PNE_BOOLEAN: pn_encoder_writef8(encoder, atom->u.as_bool); return 0; - case PNE_UBYTE: pn_encoder_writef8(encoder, atom->u.as_ubyte); return 0; - case PNE_BYTE: pn_encoder_writef8(encoder, atom->u.as_byte); return 0; - case PNE_USHORT: pn_encoder_writef16(encoder, atom->u.as_ushort); return 0; - case PNE_SHORT: pn_encoder_writef16(encoder, atom->u.as_short); return 0; - case PNE_UINT0: return 0; - case PNE_SMALLUINT: pn_encoder_writef8(encoder, atom->u.as_uint); return 0; - case PNE_UINT: pn_encoder_writef32(encoder, atom->u.as_uint); return 0; - case PNE_SMALLINT: pn_encoder_writef8(encoder, atom->u.as_int); return 0; - case PNE_INT: pn_encoder_writef32(encoder, atom->u.as_int); return 0; - case PNE_UTF32: pn_encoder_writef32(encoder, atom->u.as_char); return 0; - case PNE_ULONG: pn_encoder_writef64(encoder, atom->u.as_ulong); return 0; - case PNE_SMALLULONG: pn_encoder_writef8(encoder, atom->u.as_ulong); return 0; - case PNE_LONG: pn_encoder_writef64(encoder, atom->u.as_long); return 0; - case PNE_SMALLLONG: pn_encoder_writef8(encoder, atom->u.as_long); return 0; - case PNE_MS64: pn_encoder_writef64(encoder, atom->u.as_timestamp); return 0; - case PNE_FLOAT: c.f = atom->u.as_float; pn_encoder_writef32(encoder, c.i); return 0; - case PNE_DOUBLE: c.d = atom->u.as_double; pn_encoder_writef64(encoder, c.l); return 0; - case PNE_DECIMAL32: pn_encoder_writef32(encoder, atom->u.as_decimal32); return 0; - case PNE_DECIMAL64: pn_encoder_writef64(encoder, atom->u.as_decimal64); return 0; - case PNE_DECIMAL128: pn_encoder_writef128(encoder, atom->u.as_decimal128.bytes); return 0; - case PNE_UUID: pn_encoder_writef128(encoder, atom->u.as_uuid.bytes); return 0; - case PNE_VBIN8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0; - case PNE_VBIN32: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0; - case PNE_STR8_UTF8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0; - case PNE_STR32_UTF8: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0; - case PNE_SYM8: pn_encoder_writev8(encoder, &atom->u.as_bytes); return 0; - case PNE_SYM32: pn_encoder_writev32(encoder, &atom->u.as_bytes); return 0; - case PNE_ARRAY32: - node->start = encoder->position; - node->small = false; - // we'll backfill the size on exit - encoder->position += 4; - pn_encoder_writef32(encoder, node->described ? node->children - 1 : node->children); - if (node->described) - pn_encoder_writef8(encoder, 0); - return 0; - case PNE_LIST32: - case PNE_MAP32: - node->start = encoder->position; - node->small = false; - // we'll backfill the size later - encoder->position += 4; - pn_encoder_writef32(encoder, node->children); - return 0; - default: - return pn_error_format(data->error, PN_ERR, "unrecognized encoding: %u", code); - } -} - -#include <stdio.h> - -static int pni_encoder_exit(void *ctx, pn_data_t *data, pni_node_t *node) -{ - pn_encoder_t *encoder = (pn_encoder_t *) ctx; - char *pos; - - switch (node->atom.type) { - case PN_ARRAY: - if ((node->described && node->children == 1) || (!node->described && node->children == 0)) { - pn_encoder_writef8(encoder, pn_type2code(encoder, node->type)); - } - // Fallthrough - case PN_LIST: - case PN_MAP: - pos = encoder->position; - encoder->position = node->start; - if (node->small) { - // backfill size - size_t size = pos - node->start - 1; - pn_encoder_writef8(encoder, size); - } else { - // backfill size - size_t size = pos - node->start - 4; - pn_encoder_writef32(encoder, size); - } - encoder->position = pos; - return 0; - default: - return 0; - } -} - -ssize_t pn_encoder_encode(pn_encoder_t *encoder, pn_data_t *src, char *dst, size_t size) -{ - encoder->output = dst; - encoder->position = dst; - encoder->size = size; - - int err = pni_data_traverse(src, pni_encoder_enter, pni_encoder_exit, encoder); - if (err) return err; - size_t encoded = encoder->position - encoder->output; - if (encoded > size) { - pn_error_format(pn_data_error(src), PN_OVERFLOW, "not enough space to encode"); - return PN_OVERFLOW; - } - return (ssize_t)encoded; -} - -ssize_t pn_encoder_size(pn_encoder_t *encoder, pn_data_t *src) -{ - encoder->output = 0; - encoder->position = 0; - encoder->size = 0; - - pn_handle_t save = pn_data_point(src); - int err = pni_data_traverse(src, pni_encoder_enter, pni_encoder_exit, encoder); - pn_data_restore(src, save); - - if (err) return err; - return encoder->position - encoder->output; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/encoder.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/encoder.h b/proton-c/src/core/encoder.h deleted file mode 100644 index 20876cb..0000000 --- a/proton-c/src/core/encoder.h +++ /dev/null @@ -1,31 +0,0 @@ -#ifndef _PROTON_ENCODER_H -#define _PROTON_ENCODER_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -typedef struct pn_encoder_t pn_encoder_t; - -pn_encoder_t *pn_encoder(void); -ssize_t pn_encoder_encode(pn_encoder_t *encoder, pn_data_t *src, char *dst, size_t size); -ssize_t pn_encoder_size(pn_encoder_t *encoder, pn_data_t *src); - -#endif /* encoder.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/src/core/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/core/engine-internal.h b/proton-c/src/core/engine-internal.h deleted file mode 100644 index 1dbe91c..0000000 --- a/proton-c/src/core/engine-internal.h +++ /dev/null @@ -1,377 +0,0 @@ -#ifndef _PROTON_ENGINE_INTERNAL_H -#define _PROTON_ENGINE_INTERNAL_H 1 - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include <proton/object.h> -#include <proton/engine.h> -#include <proton/types.h> - -#include "buffer.h" -#include "dispatcher.h" -#include "util.h" - -typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpoint_type_t; - -typedef struct pn_endpoint_t pn_endpoint_t; - -struct pn_condition_t { - pn_string_t *name; - pn_string_t *description; - pn_data_t *info; -}; - -struct pn_endpoint_t { - pn_endpoint_type_t type; - pn_state_t state; - pn_error_t *error; - pn_condition_t condition; - pn_condition_t remote_condition; - pn_endpoint_t *endpoint_next; - pn_endpoint_t *endpoint_prev; - pn_endpoint_t *transport_next; - pn_endpoint_t *transport_prev; - int refcount; // when this hits zero we generate a final event - bool modified; - bool freed; - bool referenced; -}; - -typedef struct { - pn_sequence_t id; - bool sent; - bool init; -} pn_delivery_state_t; - -typedef struct { - pn_sequence_t next; - pn_hash_t *deliveries; -} pn_delivery_map_t; - -typedef struct { - // XXX: stop using negative numbers - uint32_t local_handle; - uint32_t remote_handle; - pn_sequence_t delivery_count; - pn_sequence_t link_credit; -} pn_link_state_t; - -typedef struct { - // XXX: stop using negative numbers - uint16_t local_channel; - uint16_t remote_channel; - bool incoming_init; - pn_delivery_map_t incoming; - pn_delivery_map_t outgoing; - pn_sequence_t incoming_transfer_count; - pn_sequence_t incoming_window; - pn_sequence_t remote_incoming_window; - pn_sequence_t outgoing_transfer_count; - pn_sequence_t outgoing_window; - pn_hash_t *local_handles; - pn_hash_t *remote_handles; - - uint64_t disp_code; - bool disp_settled; - bool disp_type; - pn_sequence_t disp_first; - pn_sequence_t disp_last; - bool disp; -} pn_session_state_t; - -typedef struct pn_io_layer_t { - ssize_t (*process_input)(struct pn_transport_t *transport, unsigned int layer, const char *, size_t); - ssize_t (*process_output)(struct pn_transport_t *transport, unsigned int layer, char *, size_t); - void (*handle_error)(struct pn_transport_t* transport, unsigned int layer); - pn_timestamp_t (*process_tick)(struct pn_transport_t *transport, unsigned int layer, pn_timestamp_t); - size_t (*buffered_output)(struct pn_transport_t *transport); // how much output is held -} pn_io_layer_t; - -extern const pn_io_layer_t pni_passthru_layer; -extern const pn_io_layer_t ssl_layer; -extern const pn_io_layer_t sasl_header_layer; -extern const pn_io_layer_t sasl_write_header_layer; - -// Bit flag defines for the protocol layers -typedef uint8_t pn_io_layer_flags_t; -#define LAYER_NONE 0 -#define LAYER_AMQP1 1 -#define LAYER_AMQPSASL 2 -#define LAYER_AMQPSSL 4 -#define LAYER_SSL 8 - -typedef struct pni_sasl_t pni_sasl_t; -typedef struct pni_ssl_t pni_ssl_t; - -struct pn_transport_t { - pn_tracer_t tracer; - pni_sasl_t *sasl; - pni_ssl_t *ssl; - pn_connection_t *connection; // reference counted - char *remote_container; - char *remote_hostname; - pn_data_t *remote_offered_capabilities; - pn_data_t *remote_desired_capabilities; - pn_data_t *remote_properties; - pn_data_t *disp_data; - //#define PN_DEFAULT_MAX_FRAME_SIZE (16*1024) -#define PN_DEFAULT_MAX_FRAME_SIZE (0) /* for now, allow unlimited size */ - uint32_t local_max_frame; - uint32_t remote_max_frame; - pn_condition_t remote_condition; - pn_condition_t condition; - pn_error_t *error; - -#define PN_IO_LAYER_CT 3 - const pn_io_layer_t *io_layers[PN_IO_LAYER_CT]; - - /* dead remote detection */ - pn_millis_t local_idle_timeout; - pn_millis_t remote_idle_timeout; - pn_timestamp_t dead_remote_deadline; - uint64_t last_bytes_input; - - /* keepalive */ - pn_timestamp_t keepalive_deadline; - uint64_t last_bytes_output; - - pn_hash_t *local_channels; - pn_hash_t *remote_channels; - - - /* scratch area */ - pn_string_t *scratch; - pn_data_t *args; - pn_data_t *output_args; - pn_buffer_t *frame; // frame under construction - // Temporary - size_t capacity; - size_t available; /* number of raw bytes pending output */ - char *output; - - /* statistics */ - uint64_t bytes_input; - uint64_t bytes_output; - uint64_t output_frames_ct; - uint64_t input_frames_ct; - - /* output buffered for send */ - size_t output_size; - size_t output_pending; - char *output_buf; - - /* input from peer */ - size_t input_size; - size_t input_pending; - char *input_buf; - - pn_record_t *context; - - pn_trace_t trace; - - /* - * The maximum channel number can be constrained in several ways: - * 1. an unchangeable limit imposed by this library code - * 2. a limit imposed by the remote peer when the connection is opened, - * which this app must honor - * 3. a limit imposed by this app, which may be raised and lowered - * until the OPEN frame is sent. - * These constraints are all summed up in channel_max, below. - */ - #define PN_IMPL_CHANNEL_MAX 32767 - uint16_t local_channel_max; - uint16_t remote_channel_max; - uint16_t channel_max; - - pn_io_layer_flags_t allowed_layers; - pn_io_layer_flags_t present_layers; - - bool freed; - bool open_sent; - bool open_rcvd; - bool close_sent; - bool close_rcvd; - bool tail_closed; // input stream closed by driver - bool head_closed; - bool done_processing; // if true, don't call pn_process again - bool posted_idle_timeout; - bool server; - bool halt; - bool auth_required; - bool authenticated; - bool encryption_required; - - bool referenced; -}; - -struct pn_connection_t { - pn_endpoint_t endpoint; - pn_endpoint_t *endpoint_head; - pn_endpoint_t *endpoint_tail; - pn_endpoint_t *transport_head; // reference counted - pn_endpoint_t *transport_tail; - pn_list_t *sessions; - pn_list_t *freed; - pn_transport_t *transport; - pn_delivery_t *work_head; - pn_delivery_t *work_tail; - pn_delivery_t *tpwork_head; // reference counted - pn_delivery_t *tpwork_tail; - pn_string_t *container; - pn_string_t *hostname; - pn_string_t *auth_user; - pn_string_t *auth_password; - pn_data_t *offered_capabilities; - pn_data_t *desired_capabilities; - pn_data_t *properties; - pn_collector_t *collector; - pn_record_t *context; - pn_list_t *delivery_pool; -}; - -struct pn_session_t { - pn_endpoint_t endpoint; - pn_connection_t *connection; // reference counted - pn_list_t *links; - pn_list_t *freed; - pn_record_t *context; - size_t incoming_capacity; - pn_sequence_t incoming_bytes; - pn_sequence_t outgoing_bytes; - pn_sequence_t incoming_deliveries; - pn_sequence_t outgoing_deliveries; - pn_sequence_t outgoing_window; - pn_session_state_t state; -}; - -struct pn_terminus_t { - pn_string_t *address; - pn_data_t *properties; - pn_data_t *capabilities; - pn_data_t *outcomes; - pn_data_t *filter; - pn_durability_t durability; - pn_expiry_policy_t expiry_policy; - pn_seconds_t timeout; - pn_terminus_type_t type; - pn_distribution_mode_t distribution_mode; - bool dynamic; -}; - -struct pn_link_t { - pn_endpoint_t endpoint; - pn_terminus_t source; - pn_terminus_t target; - pn_terminus_t remote_source; - pn_terminus_t remote_target; - pn_link_state_t state; - pn_string_t *name; - pn_session_t *session; // reference counted - pn_delivery_t *unsettled_head; - pn_delivery_t *unsettled_tail; - pn_delivery_t *current; - pn_record_t *context; - size_t unsettled_count; - uint64_t max_message_size; - uint64_t remote_max_message_size; - pn_sequence_t available; - pn_sequence_t credit; - pn_sequence_t queued; - int drained; // number of drained credits - uint8_t snd_settle_mode; - uint8_t rcv_settle_mode; - uint8_t remote_snd_settle_mode; - uint8_t remote_rcv_settle_mode; - bool drain_flag_mode; // receiver only - bool drain; - bool detached; -}; - -struct pn_disposition_t { - pn_condition_t condition; - uint64_t type; - pn_data_t *data; - pn_data_t *annotations; - uint64_t section_offset; - uint32_t section_number; - bool failed; - bool undeliverable; - bool settled; -}; - -struct pn_delivery_t { - pn_disposition_t local; - pn_disposition_t remote; - pn_link_t *link; // reference counted - pn_buffer_t *tag; - pn_delivery_t *unsettled_next; - pn_delivery_t *unsettled_prev; - pn_delivery_t *work_next; - pn_delivery_t *work_prev; - pn_delivery_t *tpwork_next; - pn_delivery_t *tpwork_prev; - pn_delivery_state_t state; - pn_buffer_t *bytes; - pn_record_t *context; - bool updated; - bool settled; // tracks whether we're in the unsettled list or not - bool work; - bool tpwork; - bool done; - bool referenced; -}; - -#define PN_SET_LOCAL(OLD, NEW) \ - (OLD) = ((OLD) & PN_REMOTE_MASK) | (NEW) - -#define PN_SET_REMOTE(OLD, NEW) \ - (OLD) = ((OLD) & PN_LOCAL_MASK) | (NEW) - -void pn_link_dump(pn_link_t *link); - -void pn_dump(pn_connection_t *conn); -void pn_transport_sasl_init(pn_transport_t *transport); - -void pn_condition_init(pn_condition_t *condition); -void pn_condition_tini(pn_condition_t *condition); -void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit); -void pn_real_settle(pn_delivery_t *delivery); // will free delivery if link is freed -void pn_clear_tpwork(pn_delivery_t *delivery); -void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery); -void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint); -void pn_connection_bound(pn_connection_t *conn); -void pn_connection_unbound(pn_connection_t *conn); -int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...); -void pn_set_error_layer(pn_transport_t *transport); -void pn_session_unbound(pn_session_t* ssn); -void pn_link_unbound(pn_link_t* link); -void pn_ep_incref(pn_endpoint_t *endpoint); -void pn_ep_decref(pn_endpoint_t *endpoint); - -int pn_post_frame(pn_transport_t *transport, uint8_t type, uint16_t ch, const char *fmt, ...); - -typedef enum {IN, OUT} pn_dir_t; - -void pn_do_trace(pn_transport_t *transport, uint16_t ch, pn_dir_t dir, - pn_data_t *args, const char *payload, size_t size); - -#endif /* engine-internal.h */ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
