http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/python/setup.py.in ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/setup.py.in b/proton-c/bindings/python/setup.py.in new file mode 100644 index 0000000..94f3dfc --- /dev/null +++ b/proton-c/bindings/python/setup.py.in @@ -0,0 +1,107 @@ +#!/usr/bin/env python +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +from distutils.core import setup, Extension +import logging +import os +import sys + +_c_module = '@SWIG_MODULE_cproton_REAL_NAME@' +_src_file = '@PN_SWIG_PYTHON_C_WRAPPER@' +_version = '@PN_VERSION@' +_release = 0 + +if "--proton-install-prefix" in sys.argv: + # special option used only if the python headers and library have been + # installed to a non-standard directory. This can be done during 'make + # install' from the proton build tree by using the cmake option + # -DCMAKE_INSTALL_PREFIX. The location of the headers and library must be + # specified so we can build the binding's C extension. + i = sys.argv.index("--proton-install-prefix") + 1 + if i >= len(sys.argv): + raise ValueError("--proton-install-prefix requires a path parameter.") + _prefix = sys.argv[i] + # remove the proton arguments to they don't conflict with setup.py's other + # command arguments: + del sys.argv[i] + sys.argv.remove("--proton-install-prefix") + _destdir = os.environ.get("DESTDIR", "") + if _destdir and os.path.isabs(_prefix): + # DESTDIR may be used on unix systems to put the entire install tree + # under a particular directory. However, if _prefix is an absolute + # path, os.path.join will discard DESTDIR, so strip off the leading + # separator + _prefix = _prefix.lstrip(os.path.sep) + + _inc_dir = os.path.join(_destdir, + _prefix, + '@INCLUDE_INSTALL_DIR@') + _lib_dir = os.path.join(_destdir, + _prefix, + '@LIB_INSTALL_DIR@') + + swig_ext = Extension(_c_module, [_src_file], + libraries=['qpid-proton'], + include_dirs=[_inc_dir], + library_dirs=[_lib_dir]) +else: + swig_ext = Extension(_c_module, [_src_file], + libraries=['qpid-proton']) + +_help_description = """Before you can build or install these bindings, you must +first install version @PN_VERSION@ of the Proton development library +(libqpid-proton) and its C header files. These files must be available in order +to build this packages' C-based extension. + +Packages for the Proton development library may be provided by your system's +distribution. For example, the qpid-proton-c-devel RPM is available for +Centos/RHEL via EPEL. A libqpid-proton2-dev deb file is available for Ubuntu +via the Apache Qpid PPA (ppa:qpid/released). + +If your distribution does not make these packages available, you can download +the Proton sources directly from the Apache Qpid project: + + http://qpid.apache.org + +This package is compatible with the @PN_VERSION@ release of the Proton +development library. + +If you need additional help, see http://qpid.apache.org/discussion.html +""" + +_long_description = """This package contains the Python bindings for the Apache +QPID Proton library.\n%s""" % _help_description + +try: + setup(name="python-qpid-proton", + version="%s-%d" % (_version, _release), + author="Apache Qpid", + author_email="[email protected]", + py_modules=["proton", "cproton"], + url="http://qpid.apache.org/", + description="Python bindings for the Proton library", + long_description=_long_description, + license="Apache Software License", + classifiers=["License :: OSI Approved :: Apache Software License", + "Intended Audience :: Developers", + "Programming Language :: Python"], + ext_modules=[swig_ext]) +except: + logging.error("setup failed!\n%s", _help_description) + raise
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt index 9336abf..e77e4de 100644 --- a/proton-c/bindings/ruby/CMakeLists.txt +++ b/proton-c/bindings/ruby/CMakeLists.txt @@ -20,7 +20,9 @@ if (NOT DEFAULT_RUBY_TESTING) message(FATAL_ERROR "Ruby bindings cannot be tested while missing dependencies") endif (NOT DEFAULT_RUBY_TESTING) - +list(APPEND SWIG_MODULE_cproton-ruby_EXTRA_DEPS + ${CMAKE_SOURCE_DIR}/proton-c/include/proton/cproton.i +) include_directories (${RUBY_INCLUDE_PATH}) swig_add_module(cproton-ruby ruby ruby.i) swig_link_libraries(cproton-ruby ${BINDING_DEPS} ${RUBY_LIBRARY}) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/lib/qpid_proton.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb index cf044f6..e28c684 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton.rb @@ -20,6 +20,7 @@ require "cproton" require "date" +require "qpid_proton/version" require "qpid_proton/described" require "qpid_proton/mapping" require "qpid_proton/array" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/lib/qpid_proton/version.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton/version.rb b/proton-c/bindings/ruby/lib/qpid_proton/version.rb new file mode 100644 index 0000000..cd30bf0 --- /dev/null +++ b/proton-c/bindings/ruby/lib/qpid_proton/version.rb @@ -0,0 +1,29 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +module Qpid + + module Proton + + PN_VERSION_MAJOR = Cproton::PN_VERSION_MAJOR + PN_VERSION_MINOR = Cproton::PN_VERSION_MINOR + + end + +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/qpid_proton.gemspec ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/qpid_proton.gemspec b/proton-c/bindings/ruby/qpid_proton.gemspec index d193563..f61d14c 100644 --- a/proton-c/bindings/ruby/qpid_proton.gemspec +++ b/proton-c/bindings/ruby/qpid_proton.gemspec @@ -8,6 +8,7 @@ system "swig -ruby -I../../include -o ext/cproton/cproton.c ruby.i" Gem::Specification.new do |s| s.name = "qpid_proton" s.version = "0.3" + s.licenses = ['Apache-2.0'] s.platform = Gem::Platform::RUBY s.authors = ["Darryl L. Pierce"] s.email = ["[email protected]"] http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/bindings/ruby/ruby.i ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/ruby.i b/proton-c/bindings/ruby/ruby.i index fd11786..502fa92 100644 --- a/proton-c/bindings/ruby/ruby.i +++ b/proton-c/bindings/ruby/ruby.i @@ -344,5 +344,4 @@ bool pn_ssl_get_cipher_name(pn_ssl_t *ssl, char *OUTPUT, size_t MAX_OUTPUT_SIZE) bool pn_ssl_get_protocol_name(pn_ssl_t *ssl, char *OUTPUT, size_t MAX_OUTPUT_SIZE); %ignore pn_ssl_get_protocol_name; - %include "proton/cproton.i" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/docs/man/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/docs/man/CMakeLists.txt b/proton-c/docs/man/CMakeLists.txt index d907a67..bd33e90 100644 --- a/proton-c/docs/man/CMakeLists.txt +++ b/proton-c/docs/man/CMakeLists.txt @@ -17,6 +17,6 @@ # under the License. # -INSTALL (FILES proton.1 +INSTALL (FILES proton.1 proton-dump.1 DESTINATION ${MAN_INSTALL_DIR}/man1) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/docs/man/proton-dump.1 ---------------------------------------------------------------------- diff --git a/proton-c/docs/man/proton-dump.1 b/proton-c/docs/man/proton-dump.1 new file mode 100644 index 0000000..0920d62 --- /dev/null +++ b/proton-c/docs/man/proton-dump.1 @@ -0,0 +1,19 @@ +.\" DO NOT MODIFY THIS FILE! It was generated by help2man 1.44.1. +.TH USAGE: "1" "August 2014" "Usage: proton-dump [FILE1] [FILEn] ..." "User Commands" +.SH NAME +proton-dump - display the contents of an AMQP dump file containing frame data +.SH SYNOPSIS +.B proton-dump +[\fIFILE1\fR] [\fIFILEn\fR] ... +.SH DESCRIPTION +Displays the content of an AMQP dump file containing frame data. +.TP +[FILEn] +Dump file to be displayed. +.PP +Displays the content of an AMQP dump file containing frame data. +.TP +[FILEn] +Dump file to be displayed. +.SH "SEE ALSO" +proton(1) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/buffer.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/buffer.h b/proton-c/include/proton/buffer.h index a3cf843..26d4bb3 100644 --- a/proton-c/include/proton/buffer.h +++ b/proton-c/include/proton/buffer.h @@ -29,6 +29,11 @@ extern "C" { #endif +typedef struct { + size_t size; + char *start; +} pn_buffer_memory_t; + typedef struct pn_buffer_t pn_buffer_t; PN_EXTERN pn_buffer_t *pn_buffer(size_t capacity); @@ -44,6 +49,7 @@ PN_EXTERN int pn_buffer_trim(pn_buffer_t *buf, size_t left, size_t right); PN_EXTERN void pn_buffer_clear(pn_buffer_t *buf); PN_EXTERN int pn_buffer_defrag(pn_buffer_t *buf); PN_EXTERN pn_bytes_t pn_buffer_bytes(pn_buffer_t *buf); +PN_EXTERN pn_buffer_memory_t pn_buffer_memory(pn_buffer_t *buf); PN_EXTERN int pn_buffer_print(pn_buffer_t *buf); #ifdef __cplusplus http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/cproton.i ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i index cef8b1f..1a9ad7f 100644 --- a/proton-c/include/proton/cproton.i +++ b/proton-c/include/proton/cproton.i @@ -30,6 +30,26 @@ typedef long long int int64_t; /* Parse these interface header files to generate APIs for script languages */ %include "proton/import_export.h" + +%ignore _PROTON_VERSION_H; +%include "proton/version.h" + +/* We cannot safely just wrap pn_bytes_t but each language binding must have a typemap for it - presumably to a string type */ +%ignore pn_bytes_t; + +/* There is no need to wrap pn_class_t aa it is an internal implementation detail and cannot be used outside the library */ +%ignore pn_class_t; + +/* Ignore C APIs related to pn_atom_t - they can all be achieved with pn_data_t */ +%ignore pn_atom_t; +%ignore pn_atom_t_u; /* Seem to need this even though its nested in pn_atom_t */ +%ignore pn_data_get_atom; +%ignore pn_data_put_atom; + +%ignore pn_delivery_tag_t; +%ignore pn_decimal128_t; +%ignore pn_uuid_t; + %include "proton/types.h" %ignore pn_string_vformat; %ignore pn_string_vaddf; @@ -60,7 +80,7 @@ typedef long long int int64_t; %aggregate_check(int, check_sasl_outcome, PN_SASL_NONE, PN_SASL_OK, PN_SASL_AUTH, - PN_SASL_SYS, PN_SASL_PERM, PN_SASL_TEMP); + PN_SASL_SYS, PN_SASL_PERM, PN_SASL_TEMP, PN_SASL_SKIPPED); %aggregate_check(int, check_sasl_state, PN_SASL_CONF, PN_SASL_IDLE, PN_SASL_STEP, @@ -982,6 +1002,12 @@ typedef long long int int64_t; sasl != NULL; } +%contract pn_sasl_allow_skip(pn_sasl_t *sasl, bool allow) +{ + require: + sasl != NULL; +} + %contract pn_sasl_plain(pn_sasl_t *sasl, const char *username, const char *password) { require: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/event.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h index fdb2803..c57c77d 100644 --- a/proton-c/include/proton/event.h +++ b/proton-c/include/proton/event.h @@ -81,8 +81,12 @@ typedef struct pn_event_t pn_event_t; */ typedef enum { PN_EVENT_CATEGORY_NONE = 0, - PN_EVENT_CATEGORY_PROTOCOL = 0x00010000, - PN_EVENT_CATEGORY_COUNT = 2 + PN_EVENT_CATEGORY_CONNECTION = 0x00010000, + PN_EVENT_CATEGORY_SESSION = 0x00020000, + PN_EVENT_CATEGORY_LINK = 0x00030000, + PN_EVENT_CATEGORY_DELIVERY = 0x00040000, + PN_EVENT_CATEGORY_TRANSPORT = 0x00050000, + PN_EVENT_CATEGORY_COUNT = 6 } pn_event_category_t; /** @@ -94,45 +98,137 @@ typedef enum { * ever be generated. */ PN_EVENT_NONE = 0, + + /** + * The connection has been created. This is the first event that + * will ever be issued for a connection. Events of this type point + * to the relevant connection. + */ + PN_CONNECTION_INIT = PN_EVENT_CATEGORY_CONNECTION + 1, + + /** + * The local connection endpoint has been closed. Events of this + * type point to the relevant connection. + */ + PN_CONNECTION_OPEN = PN_EVENT_CATEGORY_CONNECTION + 2, + + /** + * The remote endpoint has opened the connection. Events of this + * type point to the relevant connection. + */ + PN_CONNECTION_REMOTE_OPEN = PN_EVENT_CATEGORY_CONNECTION + 3, + + /** + * The local connection endpoint has been closed. Events of this + * type point to the relevant connection. + */ + PN_CONNECTION_CLOSE = PN_EVENT_CATEGORY_CONNECTION + 4, + + /** + * The remote endpoint has closed the connection. Events of this + * type point to the relevant connection. + */ + PN_CONNECTION_REMOTE_CLOSE = PN_EVENT_CATEGORY_CONNECTION + 5, + + /** + * The connection has been freed and any outstanding processing has + * been completed. This is the final event that will ever be issued + * for a connection. + */ + PN_CONNECTION_FINAL = PN_EVENT_CATEGORY_CONNECTION + 6, + + /** + * The session has been created. This is the first event that will + * ever be issued for a session. + */ + PN_SESSION_INIT = PN_EVENT_CATEGORY_SESSION + 1, + /** - * The endpoint state flags for a connection have changed. Events of - * this type point to the relevant connection as well as its - * associated transport. + * The local session endpoint has been opened. Events of this type + * point ot the relevant session. */ - PN_CONNECTION_REMOTE_STATE = PN_EVENT_CATEGORY_PROTOCOL+1, - PN_CONNECTION_LOCAL_STATE = PN_EVENT_CATEGORY_PROTOCOL+2, + PN_SESSION_OPEN = PN_EVENT_CATEGORY_SESSION + 2, + /** - * The endpoint state flags for a session have changed. Events of - * this type point to the relevant session as well as its associated - * connection and transport. + * The remote endpoint has opened the session. Events of this type + * point to the relevant session. */ - PN_SESSION_REMOTE_STATE = PN_EVENT_CATEGORY_PROTOCOL+3, - PN_SESSION_LOCAL_STATE = PN_EVENT_CATEGORY_PROTOCOL+4, + PN_SESSION_REMOTE_OPEN = PN_EVENT_CATEGORY_SESSION + 3, + /** - * The endpoint state flags for a link have changed. Events of this - * type point to the relevant link as well as its associated - * session, connection, and transport. + * The local session endpoint has been closed. Events of this type + * point ot the relevant session. */ - PN_LINK_REMOTE_STATE = PN_EVENT_CATEGORY_PROTOCOL+5, - PN_LINK_LOCAL_STATE = PN_EVENT_CATEGORY_PROTOCOL+6, + PN_SESSION_CLOSE = PN_EVENT_CATEGORY_SESSION + 4, + + /** + * The remote endpoint has closed the session. Events of this type + * point to the relevant session. + */ + PN_SESSION_REMOTE_CLOSE = PN_EVENT_CATEGORY_SESSION + 5, + + /** + * The session has been freed and any outstanding processing has + * been completed. This is the final event that will ever be issued + * for a session. + */ + PN_SESSION_FINAL = PN_EVENT_CATEGORY_SESSION + 6, + + /** + * The link has been created. This is the first event that will ever + * be issued for a link. + */ + PN_LINK_INIT = PN_EVENT_CATEGORY_LINK + 1, + + /** + * The local link endpoint has been opened. Events of this type + * point ot the relevant link. + */ + PN_LINK_OPEN = PN_EVENT_CATEGORY_LINK + 2, + + /** + * The remote endpoint has opened the link. Events of this type + * point to the relevant link. + */ + PN_LINK_REMOTE_OPEN = PN_EVENT_CATEGORY_LINK + 3, + + /** + * The local link endpoint has been closed. Events of this type + * point ot the relevant link. + */ + PN_LINK_CLOSE = PN_EVENT_CATEGORY_LINK + 4, + + /** + * The remote endpoint has closed the link. Events of this type + * point to the relevant link. + */ + PN_LINK_REMOTE_CLOSE = PN_EVENT_CATEGORY_LINK + 5, + /** * The flow control state for a link has changed. Events of this - * type point to the relevant link along with its associated - * session, connection, and transport. + * type point to the relevant link. */ - PN_LINK_FLOW = PN_EVENT_CATEGORY_PROTOCOL+7, + PN_LINK_FLOW = PN_EVENT_CATEGORY_LINK + 6, + + /** + * The link has been freed and any outstanding processing has been + * completed. This is the final event that will ever be issued for a + * link. Events of this type point to the relevant link. + */ + PN_LINK_FINAL = PN_EVENT_CATEGORY_LINK + 7, + /** * A delivery has been created or updated. Events of this type point - * to the relevant delivery as well as its associated link, session, - * connection, and transport. + * to the relevant delivery. */ - PN_DELIVERY = PN_EVENT_CATEGORY_PROTOCOL+8, + PN_DELIVERY = PN_EVENT_CATEGORY_DELIVERY + 1, + /** * The transport has new data to read and/or write. Events of this - * type point to the relevant transport as well as its associated - * connection. + * type point to the relevant transport. */ - PN_TRANSPORT = PN_EVENT_CATEGORY_PROTOCOL+9 + PN_TRANSPORT = PN_EVENT_CATEGORY_TRANSPORT + 1 + } pn_event_type_t; /** @@ -198,6 +294,11 @@ PN_EXTERN pn_event_type_t pn_event_type(pn_event_t *event); PN_EXTERN pn_event_category_t pn_event_category(pn_event_t *event); /** + * Get the context associated with an event. + */ +PN_EXTERN void *pn_event_context(pn_event_t *event); + +/** * Get the connection associated with an event. * * @param[in] event an event object http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/io.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/io.h b/proton-c/include/proton/io.h index fffc09a..2d56736 100644 --- a/proton-c/include/proton/io.h +++ b/proton-c/include/proton/io.h @@ -44,6 +44,7 @@ typedef int pn_socket_t; #endif typedef struct pn_io_t pn_io_t; +typedef struct pn_selector_t pn_selector_t; PN_EXTERN pn_io_t *pn_io(void); PN_EXTERN void pn_io_free(pn_io_t *io); @@ -58,6 +59,7 @@ PN_EXTERN int pn_pipe(pn_io_t *io, pn_socket_t *dest); PN_EXTERN ssize_t pn_read(pn_io_t *io, pn_socket_t socket, void *buf, size_t size); PN_EXTERN ssize_t pn_write(pn_io_t *io, pn_socket_t socket, const void *buf, size_t size); PN_EXTERN bool pn_wouldblock(pn_io_t *io); +PN_EXTERN pn_selector_t *pn_io_selector(pn_io_t *io); #ifdef __cplusplus } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/object.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/object.h b/proton-c/include/proton/object.h index dc4983a..9f6e63c 100644 --- a/proton-c/include/proton/object.h +++ b/proton-c/include/proton/object.h @@ -43,6 +43,7 @@ typedef void *(*pn_iterator_next_t)(void *state); typedef struct pn_iterator_t pn_iterator_t; typedef struct { + const char *name; void (*initialize)(void *); void (*finalize)(void *); uintptr_t (*hashcode)(void *); @@ -51,6 +52,7 @@ typedef struct { } pn_class_t; #define PN_CLASS(PREFIX) { \ + #PREFIX, \ PREFIX ## _initialize, \ PREFIX ## _finalize, \ PREFIX ## _hashcode, \ @@ -58,14 +60,17 @@ typedef struct { PREFIX ## _inspect \ } -PN_EXTERN void *pn_new(size_t size, pn_class_t *clazz); -PN_EXTERN void pn_initialize(void *object, pn_class_t *clazz); +PN_EXTERN void *pn_new(size_t size, const pn_class_t* clazz); +PN_EXTERN void *pn_new2(size_t size, const pn_class_t* clazz, void *from); +PN_EXTERN void pn_initialize(void *object, const pn_class_t *clazz); PN_EXTERN void *pn_incref(void *object); +PN_EXTERN void *pn_incref2(void *object, void *from); PN_EXTERN void pn_decref(void *object); +PN_EXTERN void pn_decref2(void *object, void *from); PN_EXTERN int pn_refcount(void *object); PN_EXTERN void pn_finalize(void *object); PN_EXTERN void pn_free(void *object); -PN_EXTERN pn_class_t *pn_class(void *object); +PN_EXTERN const pn_class_t *pn_class(void* object); PN_EXTERN uintptr_t pn_hashcode(void *object); PN_EXTERN intptr_t pn_compare(void *a, void *b); PN_EXTERN bool pn_equals(void *a, void *b); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/sasl.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/sasl.h b/proton-c/include/proton/sasl.h index 0cd9141..1f132e6 100644 --- a/proton-c/include/proton/sasl.h +++ b/proton-c/include/proton/sasl.h @@ -54,7 +54,8 @@ typedef enum { PN_SASL_AUTH=1, /** failed due to bad credentials */ PN_SASL_SYS=2, /** failed due to a system error */ PN_SASL_PERM=3, /** failed due to unrecoverable error */ - PN_SASL_TEMP=4 /** failed due to transient error */ + PN_SASL_TEMP=4, /** failed due to transient error */ + PN_SASL_SKIPPED=5 /** the peer didn't perform the sasl exchange */ } pn_sasl_outcome_t; /** The state of the SASL negotiation process */ @@ -113,6 +114,18 @@ PN_EXTERN void pn_sasl_client(pn_sasl_t *sasl); */ PN_EXTERN void pn_sasl_server(pn_sasl_t *sasl); +/** Configure a SASL server layer to permit the client to skip the SASL exchange. + * + * If the peer client skips the SASL exchange (i.e. goes right to the AMQP header) + * this server layer will succeed and result in the outcome of PN_SASL_SKIPPED. + * The default behavior is to fail and close the connection if the client skips + * SASL. + * + * @param[in] sasl the SASL layer to configure + * @param[in] allow true -> allow skip; false -> forbid skip + */ + PN_EXTERN void pn_sasl_allow_skip(pn_sasl_t *sasl, bool allow); + /** Configure the SASL layer to use the "PLAIN" mechanism. * * A utility function to configure a simple client SASL layer using http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/selector.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/selector.h b/proton-c/include/proton/selector.h index 37370d4..7d599b0 100644 --- a/proton-c/include/proton/selector.h +++ b/proton-c/include/proton/selector.h @@ -34,9 +34,7 @@ extern "C" { #define PN_WRITABLE (2) #define PN_EXPIRED (4) -typedef struct pn_selector_t pn_selector_t; - -PN_EXTERN pn_selector_t *pn_selector(void); +pn_selector_t *pni_selector(void); PN_EXTERN void pn_selector_free(pn_selector_t *selector); PN_EXTERN void pn_selector_add(pn_selector_t *selector, pn_selectable_t *selectable); PN_EXTERN void pn_selector_update(pn_selector_t *selector, pn_selectable_t *selectable); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/terminus.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/terminus.h b/proton-c/include/proton/terminus.h index 9c9096b..3765b28 100644 --- a/proton-c/include/proton/terminus.h +++ b/proton-c/include/proton/terminus.h @@ -90,10 +90,10 @@ typedef enum { * counting down. */ typedef enum { - PN_LINK_CLOSE, /**< the terminus is orphaned when the parent link is closed */ - PN_SESSION_CLOSE, /**< the terminus is orphaned when the parent session is closed */ - PN_CONNECTION_CLOSE, /**< the terminus is orphaned when the parent connection is closed */ - PN_NEVER /**< the terminus is never considered orphaned */ + PN_EXPIRE_WITH_LINK, /**< the terminus is orphaned when the parent link is closed */ + PN_EXPIRE_WITH_SESSION, /**< the terminus is orphaned when the parent session is closed */ + PN_EXPIRE_WITH_CONNECTION, /**< the terminus is orphaned when the parent connection is closed */ + PN_EXPIRE_NEVER /**< the terminus is never considered orphaned */ } pn_expiry_policy_t; /** http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/transport.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/transport.h b/proton-c/include/proton/transport.h index 1fa24c8..abe2853 100644 --- a/proton-c/include/proton/transport.h +++ b/proton-c/include/proton/transport.h @@ -328,16 +328,17 @@ PN_EXTERN char *pn_transport_tail(pn_transport_t *transport); * * This is equivalent to copying @c size bytes afther the tail pointer * and then calling ::pn_transport_process with an argument of @c - * size. It is an error to call this with a @c size larger than the - * capacity reported by ::pn_transport_capacity. + * size. Only some of the bytes will be copied if there is + * insufficienty capacity available. Use ::pn_transport_capacity to + * determine how much capacity the transport has. * * @param[in] transport the transport * @param[in] src the start of the data to push into the transport * @param[in] size the amount of data to push into the transport * - * @return 0 on success, or error code if < 0 + * @return the number of bytes pushed on success, or error code if < 0 */ -PN_EXTERN int pn_transport_push(pn_transport_t *transport, const char *src, size_t size); +PN_EXTERN ssize_t pn_transport_push(pn_transport_t *transport, const char *src, size_t size); /** * Process input data following the tail pointer. @@ -404,9 +405,9 @@ PN_EXTERN const char *pn_transport_head(pn_transport_t *transport); * @param[in] transport the transport * @param[out] dst the destination buffer * @param[in] size the capacity of the destination buffer - * @return 0 on success, or error code if < 0 + * @return number of bytes copied on success, or error code if < 0 */ -PN_EXTERN int pn_transport_peek(pn_transport_t *transport, char *dst, size_t size); +PN_EXTERN ssize_t pn_transport_peek(pn_transport_t *transport, char *dst, size_t size); /** * Removes @c size bytes of output from the pending output queue http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/include/proton/types.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h index 4182f25..d15b745 100644 --- a/proton-c/include/proton/types.h +++ b/proton-c/include/proton/types.h @@ -23,6 +23,7 @@ */ #include <proton/import_export.h> +#include <stddef.h> #include <sys/types.h> #include <proton/type_compat.h> @@ -58,11 +59,10 @@ typedef struct { typedef struct { size_t size; - char *start; + const char *start; } pn_bytes_t; -PN_EXTERN pn_bytes_t pn_bytes(size_t size, char *start); -PN_EXTERN pn_bytes_t pn_bytes_dup(size_t size, const char *start); +PN_EXTERN pn_bytes_t pn_bytes(size_t size, const char *start); /** @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/buffer.c ---------------------------------------------------------------------- diff --git a/proton-c/src/buffer.c b/proton-c/src/buffer.c index 1371831..b69034b 100644 --- a/proton-c/src/buffer.c +++ b/proton-c/src/buffer.c @@ -273,6 +273,18 @@ pn_bytes_t pn_buffer_bytes(pn_buffer_t *buf) } } +pn_buffer_memory_t pn_buffer_memory(pn_buffer_t *buf) +{ + if (buf) { + pn_buffer_defrag(buf); + pn_buffer_memory_t r = {buf->size, buf->bytes}; + return r; + } else { + pn_buffer_memory_t r = {0, NULL}; + return r; + } +} + int pn_buffer_print(pn_buffer_t *buf) { printf("pn_buffer(\""); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/codec/codec.c ---------------------------------------------------------------------- diff --git a/proton-c/src/codec/codec.c b/proton-c/src/codec/codec.c index 9370129..0660032 100644 --- a/proton-c/src/codec/codec.c +++ b/proton-c/src/codec/codec.c @@ -92,7 +92,7 @@ static void pn_data_finalize(void *object) pn_free(data->encoder); } -static pn_fields_t *pni_node_fields(pn_data_t *data, pni_node_t *node) +static const pn_fields_t *pni_node_fields(pn_data_t *data, pni_node_t *node) { if (!node) return NULL; if (node->atom.type != PN_DESCRIBED) return NULL; @@ -103,8 +103,9 @@ static pn_fields_t *pni_node_fields(pn_data_t *data, pni_node_t *node) return NULL; } - if (descriptor->atom.u.as_ulong < 256) { - return &FIELDS[descriptor->atom.u.as_ulong]; + if (descriptor->atom.u.as_ulong >= FIELD_MIN && descriptor->atom.u.as_ulong <= FIELD_MAX) { + const pn_fields_t *f = &FIELDS[descriptor->atom.u.as_ulong-FIELD_MIN]; + return (f->name_index!=0) ? f : NULL; } else { return NULL; } @@ -233,9 +234,16 @@ int pni_inspect_atom(pn_atom_t *atom, pn_string_t *str) if (quote) if ((err = pn_string_addf(str, "\""))) return err; return 0; } + case PN_LIST: + return pn_string_addf(str, "<list>"); + case PN_MAP: + return pn_string_addf(str, "<map>"); + case PN_ARRAY: + return pn_string_addf(str, "<array>"); + case PN_DESCRIBED: + return pn_string_addf(str, "<described>"); default: - assert(false); - return PN_ERR; + return pn_string_addf(str, "<undefined: %i>", atom->type); } } @@ -245,9 +253,9 @@ int pni_inspect_enter(void *ctx, pn_data_t *data, pni_node_t *node) pn_atom_t *atom = (pn_atom_t *) &node->atom; pni_node_t *parent = pn_data_node(data, node->parent); - pn_fields_t *fields = pni_node_fields(data, parent); + const pn_fields_t *fields = pni_node_fields(data, parent); pni_node_t *grandparent = parent ? pn_data_node(data, parent->parent) : NULL; - pn_fields_t *grandfields = pni_node_fields(data, grandparent); + const pn_fields_t *grandfields = pni_node_fields(data, grandparent); int index = pni_node_index(data, node); int err; @@ -256,7 +264,9 @@ int pni_inspect_enter(void *ctx, pn_data_t *data, pni_node_t *node) if (atom->type == PN_NULL) { return 0; } - const char *name = grandfields->fields[index]; + const char *name = (index < grandfields->field_count) + ? FIELD_STRINGPOOL+FIELD_FIELDS[grandfields->first_field_index+index] + : NULL; if (name) { err = pn_string_addf(str, "%s=", name); if (err) return err; @@ -275,7 +285,7 @@ int pni_inspect_enter(void *ctx, pn_data_t *data, pni_node_t *node) return pn_string_addf(str, "{"); default: if (fields && index == 0) { - err = pn_string_addf(str, "%s", fields->name); + err = pn_string_addf(str, "%s", FIELD_STRINGPOOL+FIELD_NAME[fields->name_index]); if (err) return err; err = pn_string_addf(str, "("); if (err) return err; @@ -305,7 +315,7 @@ int pni_inspect_exit(void *ctx, pn_data_t *data, pni_node_t *node) pn_string_t *str = (pn_string_t *) ctx; pni_node_t *parent = pn_data_node(data, node->parent); pni_node_t *grandparent = parent ? pn_data_node(data, parent->parent) : NULL; - pn_fields_t *grandfields = pni_node_fields(data, grandparent); + const pn_fields_t *grandfields = pni_node_fields(data, grandparent); pni_node_t *next = pn_data_node(data, node->next); int err; @@ -356,7 +366,7 @@ static int pn_data_inspect(void *obj, pn_string_t *dst) pn_data_t *pn_data(size_t capacity) { - static pn_class_t clazz = PN_CLASS(pn_data); + static const pn_class_t clazz = PN_CLASS(pn_data); pn_data_t *data = (pn_data_t *) pn_new(sizeof(pn_data_t), &clazz); data->capacity = capacity; data->size = 0; @@ -407,12 +417,12 @@ void pn_data_clear(pn_data_t *data) int pn_data_grow(pn_data_t *data) { - data->capacity = 2*(data->capacity ? data->capacity : 16); + data->capacity = 2*(data->capacity ? data->capacity : 2); data->nodes = (pni_node_t *) realloc(data->nodes, data->capacity * sizeof(pni_node_t)); return 0; } -ssize_t pn_data_intern(pn_data_t *data, char *start, size_t size) +ssize_t pn_data_intern(pn_data_t *data, const char *start, size_t size) { size_t offset = pn_buffer_size(data->buf); int err = pn_buffer_append(data->buf, start, size); @@ -454,7 +464,7 @@ int pn_data_intern_node(pn_data_t *data, pni_node_t *node) node->data = true; node->data_offset = offset; node->data_size = bytes->size; - pn_bytes_t buf = pn_buffer_bytes(data->buf); + pn_buffer_memory_t buf = pn_buffer_memory(data->buf); bytes->start = buf.start + offset; if (pn_buffer_capacity(data->buf) != oldcap) { @@ -1102,7 +1112,7 @@ int pn_data_resize(pn_data_t *data, size_t size) } -pni_node_t *pn_data_node(pn_data_t *data, size_t nd) +pni_node_t *pn_data_node(pn_data_t *data, pni_nid_t nd) { if (nd) { return &data->nodes[nd - 1]; @@ -1348,7 +1358,7 @@ bool pn_data_lookup(pn_data_t *data, const char *name) void pn_data_dump(pn_data_t *data) { - printf("{current=%" PN_ZI ", parent=%" PN_ZI "}\n", data->current, data->parent); + printf("{current=%" PN_ZI ", parent=%" PN_ZI "}\n", (size_t) data->current, (size_t) data->parent); for (unsigned i = 0; i < data->size; i++) { pni_node_t *node = &data->nodes[i]; @@ -1356,7 +1366,11 @@ void pn_data_dump(pn_data_t *data) pni_inspect_atom((pn_atom_t *) &node->atom, data->str); printf("Node %i: prev=%" PN_ZI ", next=%" PN_ZI ", parent=%" PN_ZI ", down=%" PN_ZI ", children=%" PN_ZI ", type=%s (%s)\n", - i + 1, node->prev, node->next, node->parent, node->down, node->children, + i + 1, (size_t) node->prev, + (size_t) node->next, + (size_t) node->parent, + (size_t) node->down, + (size_t) node->children, pn_type_name(node->atom.type), pn_string_get(data->str)); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/codec/data.h ---------------------------------------------------------------------- diff --git a/proton-c/src/codec/data.h b/proton-c/src/codec/data.h index 5ac8d45..be1669a 100644 --- a/proton-c/src/codec/data.h +++ b/proton-c/src/codec/data.h @@ -27,39 +27,41 @@ #include "decoder.h" #include "encoder.h" +typedef uint16_t pni_nid_t; + typedef struct { - size_t next; - size_t prev; - size_t down; - size_t parent; - size_t children; + char *start; + size_t data_offset; + size_t data_size; pn_atom_t atom; + pn_type_t type; + pni_nid_t next; + pni_nid_t prev; + pni_nid_t down; + pni_nid_t parent; + pni_nid_t children; // for arrays bool described; - pn_type_t type; bool data; - size_t data_offset; - size_t data_size; - char *start; bool small; } pni_node_t; struct pn_data_t { - size_t capacity; - size_t size; pni_node_t *nodes; pn_buffer_t *buf; - size_t parent; - size_t current; - size_t base_parent; - size_t base_current; pn_decoder_t *decoder; pn_encoder_t *encoder; pn_error_t *error; pn_string_t *str; + pni_nid_t capacity; + pni_nid_t size; + pni_nid_t parent; + pni_nid_t current; + pni_nid_t base_parent; + pni_nid_t base_current; }; -pni_node_t *pn_data_node(pn_data_t *data, size_t nd); +pni_node_t *pn_data_node(pn_data_t *data, pni_nid_t nd); int pni_data_traverse(pn_data_t *data, int (*enter)(void *ctx, pn_data_t *data, pni_node_t *node), int (*exit)(void *ctx, pn_data_t *data, pni_node_t *node), http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/codec/decoder.c ---------------------------------------------------------------------- diff --git a/proton-c/src/codec/decoder.c b/proton-c/src/codec/decoder.c index cfdd3b9..28e8ae1 100644 --- a/proton-c/src/codec/decoder.c +++ b/proton-c/src/codec/decoder.c @@ -54,7 +54,7 @@ static void pn_decoder_finalize(void *obj) { pn_decoder_t *pn_decoder() { - static pn_class_t clazz = PN_CLASS(pn_decoder); + static const pn_class_t clazz = PN_CLASS(pn_decoder); return (pn_decoder_t *) pn_new(sizeof(pn_decoder_t), &clazz); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/codec/encoder.c ---------------------------------------------------------------------- diff --git a/proton-c/src/codec/encoder.c b/proton-c/src/codec/encoder.c index 639d132..f0f3cef 100644 --- a/proton-c/src/codec/encoder.c +++ b/proton-c/src/codec/encoder.c @@ -56,7 +56,7 @@ static void pn_encoder_finalize(void *obj) { pn_encoder_t *pn_encoder() { - static pn_class_t clazz = PN_CLASS(pn_encoder); + static const pn_class_t clazz = PN_CLASS(pn_encoder); return (pn_encoder_t *) pn_new(sizeof(pn_encoder_t), &clazz); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/dispatch_actions.h ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatch_actions.h b/proton-c/src/dispatch_actions.h new file mode 100644 index 0000000..aa7a8f4 --- /dev/null +++ b/proton-c/src/dispatch_actions.h @@ -0,0 +1,45 @@ +#ifndef _PROTON_DISPATCH_ACTIONS_H +#define _PROTON_DISPATCH_ACTIONS_H 1 + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "dispatcher/dispatcher.h" + +/* Transport actions */ +int pn_do_open(pn_dispatcher_t *disp); +int pn_do_begin(pn_dispatcher_t *disp); +int pn_do_attach(pn_dispatcher_t *disp); +int pn_do_transfer(pn_dispatcher_t *disp); +int pn_do_flow(pn_dispatcher_t *disp); +int pn_do_disposition(pn_dispatcher_t *disp); +int pn_do_detach(pn_dispatcher_t *disp); +int pn_do_end(pn_dispatcher_t *disp); +int pn_do_close(pn_dispatcher_t *disp); + +/* SASL actions */ +int pn_do_init(pn_dispatcher_t *disp); +int pn_do_mechanisms(pn_dispatcher_t *disp); +int pn_do_challenge(pn_dispatcher_t *disp); +int pn_do_response(pn_dispatcher_t *disp); +int pn_do_outcome(pn_dispatcher_t *disp); + +#endif // _PROTON_DISPATCH_ACTIONS_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/dispatcher/dispatcher.c ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatcher/dispatcher.c b/proton-c/src/dispatcher/dispatcher.c index 3f3ee3c..296c3ab 100644 --- a/proton-c/src/dispatcher/dispatcher.c +++ b/proton-c/src/dispatcher/dispatcher.c @@ -30,6 +30,41 @@ #include "../util.h" #include "../platform_fmt.h" +#include "dispatch_actions.h" + +int pni_bad_frame(pn_dispatcher_t* disp) { + pn_transport_log(disp->transport, "Error dispatching frame: Unknown performative"); + return PN_ERR; +} + +// We could use a table based approach here if we needed to dynamically +// add new performatives +static inline int pni_dispatch_action(pn_dispatcher_t* disp, uint64_t lcode) +{ + pn_action_t *action; + switch (lcode) { + /* Regular AMQP fames */ + case OPEN: action = pn_do_open; break; + case BEGIN: action = pn_do_begin; break; + case ATTACH: action = pn_do_attach; break; + case FLOW: action = pn_do_flow; break; + case TRANSFER: action = pn_do_transfer; break; + case DISPOSITION: action = pn_do_disposition; break; + case DETACH: action = pn_do_detach; break; + case END: action = pn_do_end; break; + case CLOSE: action = pn_do_close; break; + + /* SASL frames */ + case SASL_MECHANISMS: action = pn_do_mechanisms; break; + case SASL_INIT: action = pn_do_init; break; + case SASL_CHALLENGE: action = pn_do_challenge; break; + case SASL_RESPONSE: action = pn_do_response; break; + case SASL_OUTCOME: action = pn_do_outcome; break; + default: action = pni_bad_frame; break; + }; + return action(disp); +} + pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport) { pn_dispatcher_t *disp = (pn_dispatcher_t *) calloc(sizeof(pn_dispatcher_t), 1); @@ -40,11 +75,7 @@ pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport) (pn_env_bool("PN_TRACE_FRM") ? PN_TRACE_FRM : PN_TRACE_OFF) | (pn_env_bool("PN_TRACE_DRV") ? PN_TRACE_DRV : PN_TRACE_OFF); - disp->input = pn_buffer(1024); - disp->fragment = 0; - disp->channel = 0; - disp->code = 0; disp->args = pn_data(16); disp->payload = NULL; disp->size = 0; @@ -67,7 +98,6 @@ pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport) void pn_dispatcher_free(pn_dispatcher_t *disp) { if (disp) { - pn_buffer_free(disp->input); pn_data_free(disp->args); pn_data_free(disp->output_args); pn_buffer_free(disp->frame); @@ -77,12 +107,6 @@ void pn_dispatcher_free(pn_dispatcher_t *disp) } } -void pn_dispatcher_action(pn_dispatcher_t *disp, uint8_t code, - pn_action_t *action) -{ - disp->actions[code] = action; -} - typedef enum {IN, OUT} pn_dir_t; static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir, @@ -92,6 +116,10 @@ static void pn_do_trace(pn_dispatcher_t *disp, uint16_t ch, pn_dir_t dir, pn_string_format(disp->scratch, "%u %s ", ch, dir == OUT ? "->" : "<-"); pn_inspect(args, disp->scratch); + if (pn_data_size(args)==0) { + pn_string_addf(disp->scratch, "(EMPTY FRAME)"); + } + if (size) { char buf[1024]; int e = pn_quote_data(buf, 1024, payload, size); @@ -122,7 +150,8 @@ int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame) } disp->channel = frame.channel; - // XXX: assuming numeric + // XXX: assuming numeric - + // if we get a symbol we should map it to the numeric value and dispatch on that uint64_t lcode; bool scanned; int e = pn_data_scan(disp->args, "D?L.", &scanned, &lcode); @@ -134,19 +163,15 @@ int pn_dispatch_frame(pn_dispatcher_t *disp, pn_frame_t frame) pn_transport_log(disp->transport, "Error dispatching frame"); return PN_ERR; } - uint8_t code = lcode; - disp->code = code; disp->size = frame.size - dsize; if (disp->size) disp->payload = frame.payload + dsize; pn_do_trace(disp, disp->channel, IN, disp->args, disp->payload, disp->size); - pn_action_t *action = disp->actions[code]; - int err = action(disp); + int err = pni_dispatch_action(disp, lcode); disp->channel = 0; - disp->code = 0; pn_data_clear(disp->args); disp->size = 0; disp->payload = NULL; @@ -212,7 +237,7 @@ int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...) encode_performatives: pn_buffer_clear( disp->frame ); - pn_bytes_t buf = pn_buffer_bytes( disp->frame ); + pn_buffer_memory_t buf = pn_buffer_memory( disp->frame ); buf.size = pn_buffer_available( disp->frame ); ssize_t wr = pn_data_encode( disp->output_args, buf.start, buf.size ); @@ -290,7 +315,7 @@ int pn_post_transfer_frame(pn_dispatcher_t *disp, uint16_t ch, encode_performatives: pn_buffer_clear( disp->frame ); - pn_bytes_t buf = pn_buffer_bytes( disp->frame ); + pn_buffer_memory_t buf = pn_buffer_memory( disp->frame ); buf.size = pn_buffer_available( disp->frame ); ssize_t wr = pn_data_encode(disp->output_args, buf.start, buf.size); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/dispatcher/dispatcher.h ---------------------------------------------------------------------- diff --git a/proton-c/src/dispatcher/dispatcher.h b/proton-c/src/dispatcher/dispatcher.h index 80b6412..a87e383 100644 --- a/proton-c/src/dispatcher/dispatcher.h +++ b/proton-c/src/dispatcher/dispatcher.h @@ -33,17 +33,7 @@ typedef struct pn_dispatcher_t pn_dispatcher_t; typedef int (pn_action_t)(pn_dispatcher_t *disp); -#define SCRATCH (1024) -#define CODEC_LIMIT (1024) - struct pn_dispatcher_t { - pn_action_t *actions[256]; - uint8_t frame_type; - pn_trace_t trace; - pn_buffer_t *input; - size_t fragment; - uint16_t channel; - uint8_t code; pn_data_t *args; const char *payload; size_t size; @@ -55,18 +45,19 @@ struct pn_dispatcher_t { size_t capacity; size_t available; /* number of raw bytes pending output */ char *output; - pn_transport_t *transport; - bool halt; - bool batch; + pn_transport_t *transport; // TODO: We keep this to get access to logging - perhaps move logging uint64_t output_frames_ct; uint64_t input_frames_ct; pn_string_t *scratch; + pn_trace_t trace; + uint16_t channel; + uint8_t frame_type; // Used when constructing outgoing frames + bool halt; + bool batch; }; pn_dispatcher_t *pn_dispatcher(uint8_t frame_type, pn_transport_t *transport); void pn_dispatcher_free(pn_dispatcher_t *disp); -void pn_dispatcher_action(pn_dispatcher_t *disp, uint8_t code, - pn_action_t *action); int pn_scan_args(pn_dispatcher_t *disp, const char *fmt, ...); void pn_set_payload(pn_dispatcher_t *disp, const char *data, size_t size); int pn_post_frame(pn_dispatcher_t *disp, uint16_t ch, const char *fmt, ...); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index d13cd58..03cb630 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -51,6 +51,7 @@ struct pn_endpoint_t { pn_endpoint_t *transport_prev; bool modified; bool freed; + bool posted_final; }; typedef struct { @@ -95,8 +96,6 @@ typedef struct { bool disp; } pn_session_state_t; -#define SCRATCH (1024) - #include <proton/sasl.h> #include <proton/ssl.h> @@ -111,21 +110,14 @@ typedef struct pn_io_layer_t { } pn_io_layer_t; struct pn_transport_t { - bool freed; pn_tracer_t tracer; size_t header_count; pn_sasl_t *sasl; pn_ssl_t *ssl; pn_connection_t *connection; // reference counted pn_dispatcher_t *disp; - bool open_sent; - bool open_rcvd; - bool close_sent; - bool close_rcvd; char *remote_container; char *remote_hostname; - uint16_t channel_max; - uint16_t remote_channel_max; pn_data_t *remote_offered_capabilities; pn_data_t *remote_desired_capabilities; pn_data_t *remote_properties; @@ -144,15 +136,14 @@ struct pn_transport_t { /* dead remote detection */ pn_millis_t local_idle_timeout; + pn_millis_t remote_idle_timeout; pn_timestamp_t dead_remote_deadline; uint64_t last_bytes_input; /* keepalive */ - pn_millis_t remote_idle_timeout; pn_timestamp_t keepalive_deadline; uint64_t last_bytes_output; - pn_error_t *error; pn_hash_t *local_channels; pn_hash_t *remote_channels; pn_string_t *scratch; @@ -166,14 +157,23 @@ struct pn_transport_t { size_t output_pending; char *output_buf; + void *context; + /* input from peer */ size_t input_size; size_t input_pending; char *input_buf; + + uint16_t channel_max; + uint16_t remote_channel_max; + bool freed; + bool open_sent; + bool open_rcvd; + bool close_sent; + bool close_rcvd; bool tail_closed; // input stream closed by driver bool head_closed; - - void *context; + bool done_processing; // if true, don't call pn_process again }; struct pn_connection_t { @@ -211,80 +211,80 @@ struct pn_session_t { }; struct pn_terminus_t { - pn_terminus_type_t type; pn_string_t *address; - pn_durability_t durability; - pn_expiry_policy_t expiry_policy; - pn_seconds_t timeout; - bool dynamic; - pn_distribution_mode_t distribution_mode; pn_data_t *properties; pn_data_t *capabilities; pn_data_t *outcomes; pn_data_t *filter; + pn_durability_t durability; + pn_expiry_policy_t expiry_policy; + pn_seconds_t timeout; + pn_terminus_type_t type; + pn_distribution_mode_t distribution_mode; + bool dynamic; }; struct pn_link_t { pn_endpoint_t endpoint; - pn_string_t *name; - pn_session_t *session; // reference counted pn_terminus_t source; pn_terminus_t target; pn_terminus_t remote_source; pn_terminus_t remote_target; + pn_link_state_t state; + pn_string_t *name; + pn_session_t *session; // reference counted pn_delivery_t *unsettled_head; pn_delivery_t *unsettled_tail; pn_delivery_t *current; pn_delivery_t *settled_head; pn_delivery_t *settled_tail; - uint8_t snd_settle_mode; - uint8_t rcv_settle_mode; - uint8_t remote_snd_settle_mode; - uint8_t remote_rcv_settle_mode; + void *context; size_t unsettled_count; pn_sequence_t available; pn_sequence_t credit; pn_sequence_t queued; + int drained; // number of drained credits + uint8_t snd_settle_mode; + uint8_t rcv_settle_mode; + uint8_t remote_snd_settle_mode; + uint8_t remote_rcv_settle_mode; bool drain_flag_mode; // receiver only bool drain; - int drained; // number of drained credits - void *context; - pn_link_state_t state; }; struct pn_disposition_t { + pn_condition_t condition; uint64_t type; pn_data_t *data; pn_data_t *annotations; - pn_condition_t condition; - uint32_t section_number; uint64_t section_offset; + uint32_t section_number; bool failed; bool undeliverable; bool settled; }; struct pn_delivery_t { - pn_link_t *link; // reference counted - pn_buffer_t *tag; pn_disposition_t local; pn_disposition_t remote; - bool updated; - bool settled; // tracks whether we're in the unsettled list or not + pn_link_t *link; // reference counted + pn_buffer_t *tag; pn_delivery_t *unsettled_next; pn_delivery_t *unsettled_prev; pn_delivery_t *settled_next; pn_delivery_t *settled_prev; pn_delivery_t *work_next; pn_delivery_t *work_prev; - bool work; pn_delivery_t *tpwork_next; pn_delivery_t *tpwork_prev; - bool tpwork; + pn_delivery_state_t state; pn_buffer_t *bytes; - bool done; void *context; - pn_delivery_state_t state; + bool updated; + bool settled; // tracks whether we're in the unsettled list or not + bool work; + bool tpwork; + bool done; }; #define PN_SET_LOCAL(OLD, NEW) \ @@ -310,5 +310,6 @@ void pn_clear_tpwork(pn_delivery_t *delivery); void pn_work_update(pn_connection_t *connection, pn_delivery_t *delivery); void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint); void pn_connection_unbound(pn_connection_t *conn); +int pn_do_error(pn_transport_t *transport, const char *condition, const char *fmt, ...); #endif /* engine-internal.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/engine/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c index 718974a..02e5009 100644 --- a/proton-c/src/engine/engine.c +++ b/proton-c/src/engine/engine.c @@ -52,34 +52,18 @@ pn_connection_t *pn_ep_get_connection(pn_endpoint_t *endpoint) return NULL; } -/* map the endpoint type to its local event type */ -static const pn_event_type_t endpoint_event_map[] = { - PN_CONNECTION_LOCAL_STATE, /* CONNECTION */ - PN_SESSION_LOCAL_STATE, /* SESSION */ - PN_LINK_LOCAL_STATE, /* SENDER */ - PN_LINK_LOCAL_STATE}; /* RECEIVER */ - -/* setup the event given the endpoint that generated the event */ -static void endpoint_init_event(pn_event_t *event, - pn_endpoint_t *endpoint) -{ - switch (endpoint->type) { - case CONNECTION: { - pn_connection_t *conn = (pn_connection_t *) endpoint; - pn_event_init_connection(event, conn); - } - break; - case SESSION: { - pn_session_t *ssn = (pn_session_t *) endpoint; - pn_event_init_session(event, ssn); - } - break; +static pn_event_type_t endpoint_event(pn_endpoint_type_t type, bool open) { + switch (type) { + case CONNECTION: + return open ? PN_CONNECTION_OPEN : PN_CONNECTION_CLOSE; + case SESSION: + return open ? PN_SESSION_OPEN : PN_SESSION_CLOSE; case SENDER: - case RECEIVER: { - pn_link_t *link = (pn_link_t*) endpoint; - pn_event_init_link(event, link); - } - break; + case RECEIVER: + return open ? PN_LINK_OPEN : PN_LINK_CLOSE; + default: + assert(false); + return PN_EVENT_NONE; } } @@ -88,11 +72,8 @@ static void pn_endpoint_open(pn_endpoint_t *endpoint) // TODO: do we care about the current state? PN_SET_LOCAL(endpoint->state, PN_LOCAL_ACTIVE); pn_connection_t *conn = pn_ep_get_connection(endpoint); - pn_event_t *event = pn_collector_put(conn->collector, - endpoint_event_map[endpoint->type]); - if (event) { - endpoint_init_event(event, endpoint); - } + pn_collector_put(conn->collector, endpoint_event(endpoint->type, true), + endpoint); pn_modified(conn, endpoint, true); } @@ -101,11 +82,8 @@ static void pn_endpoint_close(pn_endpoint_t *endpoint) // TODO: do we care about the current state? PN_SET_LOCAL(endpoint->state, PN_LOCAL_CLOSED); pn_connection_t *conn = pn_ep_get_connection(endpoint); - pn_event_t *event = pn_collector_put(conn->collector, - endpoint_event_map[endpoint->type]); - if (event) { - endpoint_init_event(event, endpoint); - } + pn_collector_put(conn->collector, endpoint_event(endpoint->type, false), + endpoint); pn_modified(conn, endpoint, true); } @@ -133,8 +111,6 @@ void pn_endpoint_tini(pn_endpoint_t *endpoint); void pn_connection_free(pn_connection_t *connection) { assert(!connection->endpoint.freed); - if (pn_connection_state(connection) & PN_LOCAL_ACTIVE) - pn_connection_close(connection); // free those endpoints that haven't been freed by the application LL_REMOVE(connection, endpoint, &connection->endpoint); while (connection->endpoint_head) { @@ -200,7 +176,7 @@ void pn_condition_init(pn_condition_t *condition) { condition->name = pn_string(NULL); condition->description = pn_string(NULL); - condition->info = pn_data(16); + condition->info = pn_data(0); } void pn_condition_tini(pn_condition_t *condition) @@ -214,7 +190,7 @@ void pn_add_session(pn_connection_t *conn, pn_session_t *ssn) { pn_list_add(conn->sessions, ssn); ssn->connection = conn; - pn_incref(conn); // keep around until finalized + pn_incref2(conn, ssn); // keep around until finalized } void pn_remove_session(pn_connection_t *conn, pn_session_t *ssn) @@ -248,13 +224,11 @@ void pn_session_free(pn_session_t *session) pn_link_t *link = (pn_link_t *)pn_list_get(session->links, 0); pn_link_free(link); } - if (pn_session_state(session) & PN_LOCAL_ACTIVE) - pn_session_close(session); pn_remove_session(session->connection, session); pn_endpoint_t *endpoint = (pn_endpoint_t *) session; LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint); session->endpoint.freed = true; - pn_decref(session); + pn_decref2(session, session->connection); } void *pn_session_get_context(pn_session_t *session) @@ -304,8 +278,6 @@ void pn_terminus_free(pn_terminus_t *terminus) void pn_link_free(pn_link_t *link) { assert(!link->endpoint.freed); - if (pn_link_state(link) & PN_LOCAL_ACTIVE) - pn_link_close(link); pn_remove_link(link->session, link); pn_endpoint_t *endpoint = (pn_endpoint_t *) link; LL_REMOVE(pn_ep_get_connection(endpoint), endpoint, endpoint); @@ -318,10 +290,10 @@ void pn_link_free(pn_link_t *link) while (link->settled_head) { delivery = link->settled_head; LL_POP(link, settled, pn_delivery_t); - pn_decref(delivery); + pn_decref2(delivery, link); } link->endpoint.freed = true; - pn_decref(link); + pn_decref2(link, link->session); } void *pn_link_get_context(pn_link_t *link) @@ -348,6 +320,7 @@ void pn_endpoint_init(pn_endpoint_t *endpoint, int type, pn_connection_t *conn) endpoint->transport_prev = NULL; endpoint->modified = false; endpoint->freed = false; + endpoint->posted_final = false; LL_ADD(conn, endpoint, endpoint); } @@ -359,16 +332,37 @@ void pn_endpoint_tini(pn_endpoint_t *endpoint) pn_condition_tini(&endpoint->condition); } +#include "event.h" + +static bool pni_post_final(pn_endpoint_t *endpoint, pn_event_type_t type) +{ + pn_connection_t *conn = pn_ep_get_connection(endpoint); + if (!endpoint->posted_final) { + endpoint->posted_final = true; + pn_event_t *event = pn_collector_put(conn->collector, type, endpoint); + if (event) { return true; } + } + + return false; +} + static void pn_connection_finalize(void *object) { pn_connection_t *conn = (pn_connection_t *) object; + + pn_endpoint_t *endpoint = &conn->endpoint; + if (pni_post_final(endpoint, PN_CONNECTION_FINAL)) { + return; + } + + pn_decref2(conn->collector, conn); pn_free(conn->sessions); pn_free(conn->container); pn_free(conn->hostname); pn_free(conn->offered_capabilities); pn_free(conn->desired_capabilities); pn_free(conn->properties); - pn_endpoint_tini(&conn->endpoint); + pn_endpoint_tini(endpoint); } #define pn_connection_initialize NULL @@ -376,11 +370,9 @@ static void pn_connection_finalize(void *object) #define pn_connection_compare NULL #define pn_connection_inspect NULL -#include "event.h" - pn_connection_t *pn_connection() { - static pn_class_t clazz = PN_CLASS(pn_connection); + static const pn_class_t clazz = PN_CLASS(pn_connection); pn_connection_t *conn = (pn_connection_t *) pn_new(sizeof(pn_connection_t), &clazz); if (!conn) return NULL; @@ -398,17 +390,30 @@ pn_connection_t *pn_connection() conn->tpwork_tail = NULL; conn->container = pn_string(NULL); conn->hostname = pn_string(NULL); - conn->offered_capabilities = pn_data(16); - conn->desired_capabilities = pn_data(16); - conn->properties = pn_data(16); + conn->offered_capabilities = pn_data(0); + conn->desired_capabilities = pn_data(0); + conn->properties = pn_data(0); conn->collector = NULL; return conn; } +static const pn_event_type_t endpoint_init_event_map[] = { + PN_CONNECTION_INIT, /* CONNECTION */ + PN_SESSION_INIT, /* SESSION */ + PN_LINK_INIT, /* SENDER */ + PN_LINK_INIT}; /* RECEIVER */ + void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector) { + pn_decref2(connection->collector, connection); connection->collector = collector; + pn_incref2(connection->collector, connection); + pn_endpoint_t *endpoint = connection->endpoint_head; + while (endpoint) { + pn_collector_put(connection->collector, endpoint_init_event_map[endpoint->type], endpoint); + endpoint = endpoint->endpoint_next; + } } pn_state_t pn_connection_state(pn_connection_t *connection) @@ -556,7 +561,7 @@ void pn_add_tpwork(pn_delivery_t *delivery) { LL_ADD(connection, tpwork, delivery); delivery->tpwork = true; - pn_incref(delivery); + pn_incref2(delivery, connection); } pn_modified(connection, &connection->endpoint, true); } @@ -568,7 +573,7 @@ void pn_clear_tpwork(pn_delivery_t *delivery) { LL_REMOVE(connection, tpwork, delivery); delivery->tpwork = false; - pn_decref(delivery); // may free delivery! + pn_decref2(delivery, connection); // may free delivery! } } @@ -590,14 +595,12 @@ void pn_modified(pn_connection_t *connection, pn_endpoint_t *endpoint, bool emit if (!endpoint->modified) { LL_ADD(connection, transport, endpoint); endpoint->modified = true; - pn_incref(endpoint); + pn_incref2(endpoint, connection); } - if (emit) { - pn_event_t *event = pn_collector_put(connection->collector, PN_TRANSPORT); - if (event) { - pn_event_init_connection(event, connection); - } + if (emit && connection->transport) { + pn_collector_put(connection->collector, PN_TRANSPORT, + connection->transport); } } @@ -608,7 +611,7 @@ void pn_clear_modified(pn_connection_t *connection, pn_endpoint_t *endpoint) endpoint->transport_next = NULL; endpoint->transport_prev = NULL; endpoint->modified = false; - pn_decref(endpoint); // may free endpoint! + pn_decref2(endpoint, connection); // may free endpoint! } } @@ -687,6 +690,7 @@ pn_link_t *pn_link_next(pn_link_t *link, pn_state_t state) static void pn_session_finalize(void *object) { pn_session_t *session = (pn_session_t *) object; + pn_endpoint_t *endpoint = &session->endpoint; //pn_transport_t *transport = session->connection->transport; //if (transport) { /* if ((int16_t)session->state.local_channel >= 0) // END not sent */ @@ -695,13 +699,17 @@ static void pn_session_finalize(void *object) /* pn_unmap_channel(transport, session); */ /* } */ + if (pni_post_final(endpoint, PN_SESSION_FINAL)) { + return; + } + pn_free(session->links); - pn_endpoint_tini(&session->endpoint); + pn_endpoint_tini(endpoint); pn_delivery_map_free(&session->state.incoming); pn_delivery_map_free(&session->state.outgoing); pn_free(session->state.local_handles); pn_free(session->state.remote_handles); - pn_decref(session->connection); + pn_decref2(session->connection, session); } #define pn_session_initialize NULL @@ -712,8 +720,8 @@ static void pn_session_finalize(void *object) pn_session_t *pn_session(pn_connection_t *conn) { assert(conn); - static pn_class_t clazz = PN_CLASS(pn_session); - pn_session_t *ssn = (pn_session_t *) pn_new(sizeof(pn_session_t), &clazz); + static const pn_class_t clazz = PN_CLASS(pn_session); + pn_session_t *ssn = (pn_session_t *) pn_new2(sizeof(pn_session_t), &clazz, conn); if (!ssn) return NULL; pn_endpoint_init(&ssn->endpoint, SESSION, conn); @@ -736,6 +744,7 @@ pn_session_t *pn_session(pn_connection_t *conn) ssn->state.remote_handles = pn_hash(0, 0.75, PN_REFCOUNT); // end transport state + pn_collector_put(conn->collector, PN_SESSION_INIT, ssn); return ssn; } @@ -779,31 +788,36 @@ void pn_terminus_init(pn_terminus_t *terminus, pn_terminus_type_t type) terminus->type = type; terminus->address = pn_string(NULL); terminus->durability = PN_NONDURABLE; - terminus->expiry_policy = PN_SESSION_CLOSE; + terminus->expiry_policy = PN_EXPIRE_WITH_SESSION; terminus->timeout = 0; terminus->dynamic = false; terminus->distribution_mode = PN_DIST_MODE_UNSPECIFIED; - terminus->properties = pn_data(16); - terminus->capabilities = pn_data(16); - terminus->outcomes = pn_data(16); - terminus->filter = pn_data(16); + terminus->properties = pn_data(0); + terminus->capabilities = pn_data(0); + terminus->outcomes = pn_data(0); + terminus->filter = pn_data(0); } static void pn_link_finalize(void *object) { pn_link_t *link = (pn_link_t *) object; + pn_endpoint_t *endpoint = &link->endpoint; // assumptions: all deliveries freed assert(link->settled_head == NULL); assert(link->unsettled_head == NULL); + if (pni_post_final(endpoint, PN_LINK_FINAL)) { + return; + } + pn_terminus_free(&link->source); pn_terminus_free(&link->target); pn_terminus_free(&link->remote_source); pn_terminus_free(&link->remote_target); pn_free(link->name); - pn_endpoint_tini(&link->endpoint); - pn_decref(link->session); + pn_endpoint_tini(endpoint); + pn_decref2(link->session, link); } #define pn_link_initialize NULL @@ -813,12 +827,12 @@ static void pn_link_finalize(void *object) pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) { - static pn_class_t clazz = PN_CLASS(pn_link); - pn_link_t *link = (pn_link_t *) pn_new(sizeof(pn_link_t), &clazz); + static const pn_class_t clazz = PN_CLASS(pn_link); + pn_link_t *link = (pn_link_t *) pn_new2(sizeof(pn_link_t), &clazz, session); pn_endpoint_init(&link->endpoint, type, session->connection); pn_add_link(session, link); - pn_incref(session); // keep session until link finalized + pn_incref2(session, link); // keep session until link finalized link->name = pn_string(name); pn_terminus_init(&link->source, PN_SOURCE); pn_terminus_init(&link->target, PN_TARGET); @@ -844,8 +858,9 @@ pn_link_t *pn_link_new(int type, pn_session_t *session, const char *name) link->state.remote_handle = -1; link->state.delivery_count = 0; link->state.link_credit = 0; - // end transport stat + // end transport state + pn_collector_put(session->connection->collector, PN_LINK_INIT, link); return link; } @@ -1057,13 +1072,13 @@ static void pn_delivery_finalize(void *object) pn_buffer_free(delivery->bytes); pn_disposition_finalize(&delivery->local); pn_disposition_finalize(&delivery->remote); - pn_decref(delivery->link); + pn_decref2(delivery->link, delivery); } static void pn_disposition_init(pn_disposition_t *ds) { - ds->data = pn_data(16); - ds->annotations = pn_data(16); + ds->data = pn_data(0); + ds->annotations = pn_data(0); pn_condition_init(&ds->condition); } @@ -1091,11 +1106,11 @@ pn_delivery_t *pn_delivery(pn_link_t *link, pn_delivery_tag_t tag) pn_delivery_t *delivery = link->settled_head; LL_POP(link, settled, pn_delivery_t); if (!delivery) { - static pn_class_t clazz = PN_CLASS(pn_delivery); - delivery = (pn_delivery_t *) pn_new(sizeof(pn_delivery_t), &clazz); + static const pn_class_t clazz = PN_CLASS(pn_delivery); + delivery = (pn_delivery_t *) pn_new2(sizeof(pn_delivery_t), &clazz, link); if (!delivery) return NULL; delivery->link = link; - pn_incref(delivery->link); // keep link until finalized + pn_incref2(delivery->link, delivery); // keep link until finalized delivery->tag = pn_buffer(16); delivery->bytes = pn_buffer(64); pn_disposition_init(&delivery->local); @@ -1442,6 +1457,7 @@ ssize_t pn_link_send(pn_link_t *sender, const char *bytes, size_t n) { pn_delivery_t *current = pn_link_current(sender); if (!current) return PN_EOS; + if (!bytes || !n) return 0; pn_buffer_append(current->bytes, bytes, n); sender->session->outgoing_bytes += n; pn_add_tpwork(current); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/engine/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/event.c b/proton-c/src/engine/event.c index 9a3f220..07e3cb5 100644 --- a/proton-c/src/engine/event.c +++ b/proton-c/src/engine/event.c @@ -6,16 +6,13 @@ struct pn_collector_t { pn_event_t *head; pn_event_t *tail; pn_event_t *free_head; + bool freed; }; struct pn_event_t { - pn_event_type_t type; - pn_connection_t *connection; - pn_session_t *session; - pn_link_t *link; - pn_delivery_t *delivery; - pn_transport_t *transport; + void *context; // depends on type pn_event_t *next; + pn_event_type_t type; }; static void pn_collector_initialize(void *obj) @@ -24,25 +21,36 @@ static void pn_collector_initialize(void *obj) collector->head = NULL; collector->tail = NULL; collector->free_head = NULL; + collector->freed = false; } -static void pn_collector_finalize(void *obj) +static void pn_collector_drain(pn_collector_t *collector) { - pn_collector_t *collector = (pn_collector_t *) obj; - while (pn_collector_peek(collector)) { pn_collector_pop(collector); } assert(!collector->head); assert(!collector->tail); +} +static void pn_collector_shrink(pn_collector_t *collector) +{ pn_event_t *event = collector->free_head; while (event) { pn_event_t *next = event->next; pn_free(event); event = next; } + + collector->free_head = NULL; +} + +static void pn_collector_finalize(void *obj) +{ + pn_collector_t *collector = (pn_collector_t *) obj; + pn_collector_drain(collector); + pn_collector_shrink(collector); } static int pn_collector_inspect(void *obj, pn_string_t *dst) @@ -72,25 +80,39 @@ static int pn_collector_inspect(void *obj, pn_string_t *dst) pn_collector_t *pn_collector(void) { - static pn_class_t clazz = PN_CLASS(pn_collector); + static const pn_class_t clazz = PN_CLASS(pn_collector); pn_collector_t *collector = (pn_collector_t *) pn_new(sizeof(pn_collector_t), &clazz); return collector; } void pn_collector_free(pn_collector_t *collector) { - pn_free(collector); + collector->freed = true; + pn_collector_drain(collector); + pn_collector_shrink(collector); + pn_decref(collector); } pn_event_t *pn_event(void); static void pn_event_initialize(void *obj); -pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type) +pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type, void *context) { if (!collector) { return NULL; } + assert(context); + + if (collector->freed) { + return NULL; + } + + pn_event_t *tail = collector->tail; + if (tail && tail->type == type && tail->context == context) { + return NULL; + } + pn_event_t *event; if (collector->free_head) { @@ -101,8 +123,6 @@ pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type) event = pn_event(); } - pn_event_t *tail = collector->tail; - if (tail) { tail->next = event; collector->tail = event; @@ -112,26 +132,16 @@ pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type) } event->type = type; + event->context = context; + pn_incref2(event->context, collector); + + //printf("event %s on %p\n", pn_event_type_name(event->type), event->context); return event; } pn_event_t *pn_collector_peek(pn_collector_t *collector) { - // discard any events for objects that no longer exist - pn_event_t *event = collector->head; - while (event && ((event->delivery && event->delivery->local.settled) - || - (event->link && event->link->endpoint.freed) - || - (event->session && event->session->endpoint.freed) - || - (event->connection && event->connection->endpoint.freed) - || - (event->transport && event->transport->freed))) { - pn_collector_pop(collector); - event = collector->head; - } return collector->head; } @@ -148,15 +158,15 @@ bool pn_collector_pop(pn_collector_t *collector) collector->tail = NULL; } + // decref before adding to the free list + if (event->context) { + pn_decref2(event->context, collector); + event->context = NULL; + } + event->next = collector->free_head; collector->free_head = event; - if (event->connection) pn_decref(event->connection); - if (event->session) pn_decref(event->session); - if (event->link) pn_decref(event->link); - if (event->delivery) pn_decref(event->delivery); - if (event->transport) pn_decref(event->transport); - return true; } @@ -164,11 +174,7 @@ static void pn_event_initialize(void *obj) { pn_event_t *event = (pn_event_t *) obj; event->type = PN_EVENT_NONE; - event->connection = NULL; - event->session = NULL; - event->link = NULL; - event->delivery = NULL; - event->transport = NULL; + event->context = NULL; event->next = NULL; } @@ -178,16 +184,12 @@ static int pn_event_inspect(void *obj, pn_string_t *dst) { assert(obj); pn_event_t *event = (pn_event_t *) obj; - int err = pn_string_addf(dst, "(%d", event->type); - void *objects[] = {event->connection, event->session, event->link, - event->delivery, event->transport}; - for (int i = 0; i < 5; i++) { - if (objects[i]) { - err = pn_string_addf(dst, ", "); - if (err) return err; - err = pn_inspect(objects[i], dst); - if (err) return err; - } + int err = pn_string_addf(dst, "(0x%X", (unsigned int)event->type); + if (event->context) { + err = pn_string_addf(dst, ", "); + if (err) return err; + err = pn_inspect(event->context, dst); + if (err) return err; } return pn_string_addf(dst, ")"); @@ -198,45 +200,11 @@ static int pn_event_inspect(void *obj, pn_string_t *dst) pn_event_t *pn_event(void) { - static pn_class_t clazz = PN_CLASS(pn_event); + static const pn_class_t clazz = PN_CLASS(pn_event); pn_event_t *event = (pn_event_t *) pn_new(sizeof(pn_event_t), &clazz); return event; } -void pn_event_init_transport(pn_event_t *event, pn_transport_t *transport) -{ - event->transport = transport; - pn_incref(event->transport); -} - -void pn_event_init_connection(pn_event_t *event, pn_connection_t *connection) -{ - event->connection = connection; - pn_event_init_transport(event, event->connection->transport); - pn_incref(event->connection); -} - -void pn_event_init_session(pn_event_t *event, pn_session_t *session) -{ - event->session = session; - pn_event_init_connection(event, pn_session_connection(event->session)); - pn_incref(event->session); -} - -void pn_event_init_link(pn_event_t *event, pn_link_t *link) -{ - event->link = link; - pn_event_init_session(event, pn_link_session(event->link)); - pn_incref(event->link); -} - -void pn_event_init_delivery(pn_event_t *event, pn_delivery_t *delivery) -{ - event->delivery = delivery; - pn_event_init_link(event, pn_delivery_link(delivery)); - pn_incref(event->delivery); -} - pn_event_type_t pn_event_type(pn_event_t *event) { return event->type; @@ -247,29 +215,84 @@ pn_event_category_t pn_event_category(pn_event_t *event) return (pn_event_category_t)(event->type & 0xFFFF0000); } +void *pn_event_context(pn_event_t *event) +{ + assert(event); + return event->context; +} + pn_connection_t *pn_event_connection(pn_event_t *event) { - return event->connection; + pn_session_t *ssn; + pn_transport_t *transport; + + switch (pn_event_category(event)) { + case PN_EVENT_CATEGORY_CONNECTION: + return (pn_connection_t *)event->context; + case PN_EVENT_CATEGORY_TRANSPORT: + transport = pn_event_transport(event); + if (transport) + return transport->connection; + return NULL; + default: + ssn = pn_event_session(event); + if (ssn) + return pn_session_connection(ssn); + } + return NULL; } pn_session_t *pn_event_session(pn_event_t *event) { - return event->session; + pn_link_t *link; + switch (pn_event_category(event)) { + case PN_EVENT_CATEGORY_SESSION: + return (pn_session_t *)event->context; + default: + link = pn_event_link(event); + if (link) + return pn_link_session(link); + } + return NULL; } pn_link_t *pn_event_link(pn_event_t *event) { - return event->link; + pn_delivery_t *dlv; + switch (pn_event_category(event)) { + case PN_EVENT_CATEGORY_LINK: + return (pn_link_t *)event->context; + default: + dlv = pn_event_delivery(event); + if (dlv) + return pn_delivery_link(dlv); + } + return NULL; } pn_delivery_t *pn_event_delivery(pn_event_t *event) { - return event->delivery; + switch (pn_event_category(event)) { + case PN_EVENT_CATEGORY_DELIVERY: + return (pn_delivery_t *)event->context; + default: + return NULL; + } } pn_transport_t *pn_event_transport(pn_event_t *event) { - return event->transport; + switch (pn_event_category(event)) { + case PN_EVENT_CATEGORY_TRANSPORT: + return (pn_transport_t *)event->context; + default: + { + pn_connection_t *conn = pn_event_connection(event); + if (conn) + return pn_connection_transport(conn); + return NULL; + } + } } const char *pn_event_type_name(pn_event_type_t type) @@ -277,20 +300,44 @@ const char *pn_event_type_name(pn_event_type_t type) switch (type) { case PN_EVENT_NONE: return "PN_EVENT_NONE"; - case PN_CONNECTION_REMOTE_STATE: - return "PN_CONNECTION_REMOTE_STATE"; - case PN_CONNECTION_LOCAL_STATE: - return "PN_CONNECTION_LOCAL_STATE"; - case PN_SESSION_REMOTE_STATE: - return "PN_SESSION_REMOTE_STATE"; - case PN_SESSION_LOCAL_STATE: - return "PN_SESSION_LOCAL_STATE"; - case PN_LINK_REMOTE_STATE: - return "PN_LINK_REMOTE_STATE"; - case PN_LINK_LOCAL_STATE: - return "PN_LINK_LOCAL_STATE"; + case PN_CONNECTION_INIT: + return "PN_CONNECTION_INIT"; + case PN_CONNECTION_REMOTE_OPEN: + return "PN_CONNECTION_REMOTE_OPEN"; + case PN_CONNECTION_OPEN: + return "PN_CONNECTION_OPEN"; + case PN_CONNECTION_REMOTE_CLOSE: + return "PN_CONNECTION_REMOTE_CLOSE"; + case PN_CONNECTION_CLOSE: + return "PN_CONNECTION_CLOSE"; + case PN_CONNECTION_FINAL: + return "PN_CONNECTION_FINAL"; + case PN_SESSION_INIT: + return "PN_SESSION_INIT"; + case PN_SESSION_REMOTE_OPEN: + return "PN_SESSION_REMOTE_OPEN"; + case PN_SESSION_OPEN: + return "PN_SESSION_OPEN"; + case PN_SESSION_REMOTE_CLOSE: + return "PN_SESSION_REMOTE_CLOSE"; + case PN_SESSION_CLOSE: + return "PN_SESSION_CLOSE"; + case PN_SESSION_FINAL: + return "PN_SESSION_FINAL"; + case PN_LINK_INIT: + return "PN_LINK_INIT"; + case PN_LINK_REMOTE_OPEN: + return "PN_LINK_REMOTE_OPEN"; + case PN_LINK_OPEN: + return "PN_LINK_OPEN"; + case PN_LINK_REMOTE_CLOSE: + return "PN_LINK_REMOTE_CLOSE"; + case PN_LINK_CLOSE: + return "PN_LINK_CLOSE"; case PN_LINK_FLOW: return "PN_LINK_FLOW"; + case PN_LINK_FINAL: + return "PN_LINK_FINAL"; case PN_DELIVERY: return "PN_DELIVERY"; case PN_TRANSPORT: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/engine/event.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/event.h b/proton-c/src/engine/event.h index 80f3422..b05f2d0 100644 --- a/proton-c/src/engine/event.h +++ b/proton-c/src/engine/event.h @@ -22,12 +22,7 @@ * */ -pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type); - -void pn_event_init_transport(pn_event_t *event, pn_transport_t *transport); -void pn_event_init_connection(pn_event_t *event, pn_connection_t *connection); -void pn_event_init_session(pn_event_t *event, pn_session_t *session); -void pn_event_init_link(pn_event_t *event, pn_link_t *link); -void pn_event_init_delivery(pn_event_t *event, pn_delivery_t *delivery); +pn_event_t *pn_collector_put(pn_collector_t *collector, pn_event_type_t type, + void *context); #endif /* event.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/error.c ---------------------------------------------------------------------- diff --git a/proton-c/src/error.c b/proton-c/src/error.c index 77a3dc2..c3cf36a 100644 --- a/proton-c/src/error.c +++ b/proton-c/src/error.c @@ -27,9 +27,9 @@ #include "platform.h" struct pn_error_t { - int code; char *text; pn_error_t *root; + int code; }; pn_error_t *pn_error() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-c/src/message/message.c ---------------------------------------------------------------------- diff --git a/proton-c/src/message/message.c b/proton-c/src/message/message.c index 6c1088a..d91ab63 100644 --- a/proton-c/src/message/message.c +++ b/proton-c/src/message/message.c @@ -49,11 +49,8 @@ ssize_t pn_message_data(char *dst, size_t available, const char *src, size_t siz // message struct pn_message_t { - bool durable; - uint8_t priority; - pn_millis_t ttl; - bool first_acquirer; - uint32_t delivery_count; + pn_timestamp_t expiry_time; + pn_timestamp_t creation_time; pn_data_t *id; pn_string_t *user_id; pn_string_t *address; @@ -62,22 +59,28 @@ struct pn_message_t { pn_data_t *correlation_id; pn_string_t *content_type; pn_string_t *content_encoding; - pn_timestamp_t expiry_time; - pn_timestamp_t creation_time; pn_string_t *group_id; - pn_sequence_t group_sequence; pn_string_t *reply_to_group_id; - bool inferred; pn_data_t *data; pn_data_t *instructions; pn_data_t *annotations; pn_data_t *properties; pn_data_t *body; - pn_format_t format; pn_parser_t *parser; pn_error_t *error; + + pn_format_t format; + pn_sequence_t group_sequence; + pn_millis_t ttl; + uint32_t delivery_count; + + uint8_t priority; + + bool durable; + bool first_acquirer; + bool inferred; }; void pn_message_finalize(void *obj) @@ -318,7 +321,7 @@ int pn_message_inspect(void *obj, pn_string_t *dst) pn_message_t *pn_message() { - static pn_class_t clazz = PN_CLASS(pn_message); + static const pn_class_t clazz = PN_CLASS(pn_message); pn_message_t *msg = (pn_message_t *) pn_new(sizeof(pn_message_t), &clazz); msg->durable = false; msg->priority = PN_DEFAULT_PRIORITY; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
