http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/examples/engine/c/psend.c ---------------------------------------------------------------------- diff --git a/examples/engine/c/psend.c b/examples/engine/c/psend.c deleted file mode 100644 index 2ad0edb..0000000 --- a/examples/engine/c/psend.c +++ /dev/null @@ -1,373 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - - -/*################################################################ - This program is half of a pair. Precv and Psend are meant to - be simple-as-possible examples of how to use the proton-c - engine interface to send and receive messages over a single - connection and a single session. - - In addition to being examples, these programs or their - descendants will be used in performance regression testing - for both throughput and latency, and long-term soak testing. - - This program, psend, is highly similar to its peer precv. - I put all the good comments in precv. -*################################################################*/ - - -#include <unistd.h> -#include <stdlib.h> -#include <stdio.h> -#include <string.h> -#include <ctype.h> -#include <time.h> -#include <sys/time.h> -#define __STDC_FORMAT_MACROS -#include <inttypes.h> - - -#include <proton/connection.h> -#include <proton/delivery.h> -#include <proton/driver.h> -#include <proton/event.h> -#include <proton/terminus.h> -#include <proton/link.h> -#include <proton/message.h> -#include <proton/session.h> - - -#define MY_BUF_SIZE 1000 - - - -void -print_timestamp ( FILE * fp, char const * label ) -{ - struct timeval tv; - struct tm * timeinfo; - - gettimeofday ( & tv, 0 ); - timeinfo = localtime ( & tv.tv_sec ); - - int seconds_today = 3600 * timeinfo->tm_hour + - 60 * timeinfo->tm_min + - timeinfo->tm_sec; - - fprintf ( fp, "time : %d.%.6ld : %s\n", seconds_today, tv.tv_usec, label ); -} - - - - - -static -double -get_time ( ) -{ - struct timeval tv; - struct tm * timeinfo; - - gettimeofday ( & tv, 0 ); - timeinfo = localtime ( & tv.tv_sec ); - - double time_now = 3600 * timeinfo->tm_hour + - 60 * timeinfo->tm_min + - timeinfo->tm_sec; - - time_now += ((double)(tv.tv_usec) / 1000000.0); - return time_now; -} - - - - - -int -main ( int argc, char ** argv ) -{ - char addr [ 1000 ]; - char host [ 1000 ]; - char port [ 1000 ]; - char output_file_name[1000]; - - - uint64_t messages = 2000000, - delivery_count = 0; - - int message_length = 100; - - bool done = false; - int sent_count = 0; - int n_links = 5; - int const max_links = 100; - - strcpy ( addr, "queue" ); - strcpy ( host, "0.0.0.0" ); - strcpy ( port, "5672" ); - - FILE * output_fp; - - - for ( int i = 1; i < argc; ++ i ) - { - if ( ! strcmp ( "--host", argv[i] ) ) - { - strcpy ( host, argv[i+1] ); - ++ i; - } - else - if ( ! strcmp ( "--port", argv[i] ) ) - { - strcpy ( port, argv[i+1] ); - ++ i; - } - else - if ( ! strcmp ( "--messages", argv[i] ) ) - { - sscanf ( argv [ i+1 ], "%" SCNd64 , & messages ); - ++ i; - } - else - if ( ! strcmp ( "--message_length", argv[i] ) ) - { - sscanf ( argv [ i+1 ], "%d", & message_length ); - ++ i; - } - else - if ( ! strcmp ( "--n_links", argv[i] ) ) - { - sscanf ( argv [ i+1 ], "%d", & n_links ); - ++ i; - } - if ( ! strcmp ( "--output", argv[i] ) ) - { - if ( ! strcmp ( "stderr", argv[i+1] ) ) - { - output_fp = stderr; - strcpy ( output_file_name, "stderr"); - } - else - if ( ! strcmp ( "stdout", argv[i+1] ) ) - { - output_fp = stdout; - strcpy ( output_file_name, "stdout"); - } - else - { - output_fp = fopen ( argv[i+1], "w" ); - strcpy ( output_file_name, argv[i+1] ); - if ( ! output_fp ) - { - fprintf ( stderr, "Can't open |%s| for writing.\n", argv[i+1] ); - exit ( 1 ); - } - } - ++ i; - } - else - { - fprintf ( output_fp, "unknown arg %s", argv[i] ); - } - } - - - fprintf ( output_fp, "host %s\n", host ); - fprintf ( output_fp, "port %s\n", port ); - fprintf ( output_fp, "messages %" PRId64 "\n", messages ); - fprintf ( output_fp, "message_length %d\n", message_length ); - fprintf ( output_fp, "n_links %d\n", n_links ); - fprintf ( output_fp, "output %s\n", output_file_name ); - - - if ( n_links > max_links ) - { - fprintf ( output_fp, "You can't have more than %d links.\n", max_links ); - exit ( 1 ); - } - - pn_driver_t * driver; - pn_connector_t * connector; - pn_connector_t * driver_connector; - pn_connection_t * connection; - pn_collector_t * collector; - pn_link_t * links [ max_links ]; - pn_session_t * session; - pn_event_t * event; - pn_delivery_t * delivery; - - - char * message = (char *) malloc(message_length); - memset ( message, 13, message_length ); - - /*---------------------------------------------------- - Get everything set up. - We will have a single connector, a single - connection, a single session, and a single link. - ----------------------------------------------------*/ - driver = pn_driver ( ); - connector = pn_connector ( driver, host, port, 0 ); - - connection = pn_connection(); - collector = pn_collector ( ); - pn_connection_collect ( connection, collector ); - pn_connector_set_connection ( connector, connection ); - - session = pn_session ( connection ); - pn_connection_open ( connection ); - pn_session_open ( session ); - - for ( int i = 0; i < n_links; ++ i ) - { - char name[100]; - sprintf ( name, "tvc_15_%d", i ); - links[i] = pn_sender ( session, name ); - pn_terminus_set_address ( pn_link_target(links[i]), addr ); - pn_link_open ( links[i] ); - } - - /*----------------------------------------------------------- - For my speed tests, I do not want to count setup time. - Start timing here. The receiver will print out a similar - timestamp when he receives the final message. - -----------------------------------------------------------*/ - fprintf ( output_fp, "psend: sending %llu messages.\n", messages ); - - // Just before we start sending, print the start timestamp. - fprintf ( output_fp, "psend_start %.3lf\n", get_time() ); - - while ( 1 ) - { - pn_driver_wait ( driver, -1 ); - - int event_count = 1; - while ( event_count > 0 ) - { - event_count = 0; - pn_connector_process ( connector ); - - event = pn_collector_peek(collector); - while ( event ) - { - ++ event_count; - pn_event_type_t event_type = pn_event_type ( event ); - //fprintf ( output_fp, "event: %s\n", pn_event_type_name ( event_type ) ); - - switch ( event_type ) - { - case PN_LINK_FLOW: - { - if ( delivery_count < messages ) - { - /*--------------------------------------------------- - We may have opened multiple links. - The event will tell us which one this flow-event - happened on. If the flow event gave us some - credit, we will greedily send messages until it - is all used up. - ---------------------------------------------------*/ - pn_link_t * link = pn_event_link ( event ); - int credit = pn_link_credit ( link ); - - while ( credit > 0 ) - { - // Every delivery we create needs a unique tag. - char str [ 100 ]; - sprintf ( str, "%x", delivery_count ++ ); - delivery = pn_delivery ( link, pn_dtag(str, strlen(str)) ); - - // If you settle the delivery before sending it, - // you will spend some time wondering why your - // messages don't have any content when they arrive - // at the receiver. - pn_link_send ( link, message, message_length ); - pn_delivery_settle ( delivery ); - pn_link_advance ( link ); - credit = pn_link_credit ( link ); - } - - if ( delivery_count >= messages ) - { - fprintf ( output_fp, - "I have sent all %d messages.\n" , - delivery_count - ); - /* - I'm still kind of hazy on how to shut down the - psend / precv system properly .... - I can't go to all_done here, or precv will never - get all its messages and terminate. - So I let precv terminate properly ... which means - that this program, psend, dies with an error. - Hmm. - */ - // goto all_done; - } - } - } - break; - - - case PN_TRANSPORT: - // I don't know what this means here, either. - break; - - - case PN_TRANSPORT_TAIL_CLOSED: - goto all_done; - break; - - - default: - /* - fprintf ( output_fp, - "precv unhandled event: %s\n", - pn_event_type_name(event_type) - ); - */ - break; - - } - - pn_collector_pop ( collector ); - event = pn_collector_peek(collector); - } - } - } - - all_done: - - for ( int i = 0; i < n_links; ++ i ) - { - pn_link_close ( links[i] ); - } - - pn_session_close ( session ); - pn_connection_close ( connection ); - - return 0; -} - - - - -
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/examples/exampletest.py ---------------------------------------------------------------------- diff --git a/examples/exampletest.py b/examples/exampletest.py new file mode 100644 index 0000000..d40b9cb --- /dev/null +++ b/examples/exampletest.py @@ -0,0 +1,183 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License +# + +# A test library to make it easy to run unittest tests that start, +# monitor, and report output from sub-processes. In particular +# it helps with starting processes that listen on random ports. + +import unittest +import os, sys, socket, time, re, inspect, errno, threading +from random import randrange +from subprocess import Popen, PIPE, STDOUT +from copy import copy +import platform +from os.path import dirname as dirname + +def pick_port(): + """Pick a random port.""" + p = randrange(10000, 20000) + return p + +class ProcError(Exception): + """An exception that captures failed process output""" + def __init__(self, proc, what="bad exit status"): + out = proc.out.strip() + if out: + out = "\nvvvvvvvvvvvvvvvv\n%s\n^^^^^^^^^^^^^^^^\n" % out + else: + out = ", no output)" + super(Exception, self, ).__init__( + "%s %s, code=%s%s" % (proc.args, what, proc.returncode, out)) + +class NotFoundError(ProcError): + pass + +class Proc(Popen): + """A example process that stores its output, optionally run with valgrind.""" + + if "VALGRIND" in os.environ and os.environ["VALGRIND"]: + env_args = [os.environ["VALGRIND"], "--error-exitcode=42", "--quiet", "--leak-check=full"] + else: + env_args = [] + + @property + def out(self): + self._out.seek(0) + return self._out.read() + + def __init__(self, args, **kwargs): + """Start an example process""" + args = list(args) + self.args = args + self._out = os.tmpfile() + try: + Popen.__init__(self, self.env_args + self.args, stdout=self._out, stderr=STDOUT, **kwargs) + except OSError, e: + if e.errno == errno.ENOENT: + raise NotFoundError(self, str(e)) + raise ProcError(self, str(e)) + except Exception, e: + raise ProcError(self, str(e)) + + def kill(self): + try: + if self.poll() is None: + Popen.kill(self) + except: + pass # Already exited. + return self.out + + def wait_out(self, timeout=10, expect=0): + """Wait for process to exit, return output. Raise ProcError on failure.""" + t = threading.Thread(target=self.wait) + t.start() + t.join(timeout) + if self.poll() is None: # Still running + self.kill() + raise ProcError(self, "timeout") + if expect is not None and self.poll() != expect: + raise ProcError(self) + return self.out + +# Work-around older python unittest that lacks setUpClass. +if hasattr(unittest.TestCase, 'setUpClass') and hasattr(unittest.TestCase, 'tearDownClass'): + TestCase = unittest.TestCase +else: + class TestCase(unittest.TestCase): + """ + Roughly provides setUpClass and tearDownClass functionality for older python + versions in our test scenarios. If subclasses override setUp or tearDown + they *must* call the superclass. + """ + def setUp(self): + if not hasattr(type(self), '_setup_class_count'): + type(self)._setup_class_count = len( + inspect.getmembers( + type(self), + predicate=lambda(m): inspect.ismethod(m) and m.__name__.startswith('test_'))) + type(self).setUpClass() + + def tearDown(self): + self.assertTrue(self._setup_class_count > 0) + self._setup_class_count -= 1 + if self._setup_class_count == 0: + type(self).tearDownClass() + +class ExampleTestCase(TestCase): + """TestCase that manages started processes""" + def setUp(self): + super(ExampleTestCase, self).setUp() + self.procs = [] + + def tearDown(self): + for p in self.procs: + p.kill() + super(ExampleTestCase, self).tearDown() + + def proc(self, *args, **kwargs): + p = Proc(*args, **kwargs) + self.procs.append(p) + return p + +def wait_port(port, timeout=10): + """Wait up to timeout for port to be connectable.""" + if timeout: + deadline = time.time() + timeout + while (timeout is None or time.time() < deadline): + try: + s = socket.create_connection((None, port), timeout) # Works for IPv6 and v4 + s.close() + return + except socket.error, e: + if e.errno != errno.ECONNREFUSED: # Only retry on connection refused error. + raise + raise socket.timeout() + + +class BrokerTestCase(ExampleTestCase): + """ + ExampleTest that starts a broker in setUpClass and kills it in tearDownClass. + Subclass must set `broker_exe` class variable with the name of the broker executable. + """ + + @classmethod + def setUpClass(cls): + cls.port = pick_port() + cls.addr = "127.0.0.1:%s/examples" % (cls.port) + cls.broker = None # In case Proc throws, create the attribute. + cls.broker = Proc(cls.broker_exe + ["-a", cls.addr]) + try: + wait_port(cls.port) + except Exception, e: + cls.broker.kill() + raise ProcError(cls.broker, "timed out waiting for port") + + @classmethod + def tearDownClass(cls): + if cls.broker: cls.broker.kill() + + def tearDown(self): + b = type(self).broker + if b and b.poll() != None: # Broker crashed + type(self).setUpClass() # Start another for the next test. + raise ProcError(b, "broker crash") + super(BrokerTestCase, self).tearDown() + +if __name__ == "__main__": + unittest.main() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp index 70e6754..d9825c2 100644 --- a/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp +++ b/proton-c/bindings/cpp/include/proton/io/connection_engine.hpp @@ -121,8 +121,8 @@ PN_CPP_CLASS_EXTERN connection_engine { /// Configure a connection by applying exactly the options in opts (including proton::messaging_handler) /// Does not apply any default options, to apply container defaults use connect() or accept() - /// instead. - void configure(const connection_options& opts=connection_options()); + /// instead. If server==true, configure a server connection. + void configure(const connection_options& opts=connection_options(), bool server=false); /// Call configure() with client options and call connection::open() /// Options applied: container::id(), container::client_connection_options(), opts. @@ -200,12 +200,13 @@ PN_CPP_CLASS_EXTERN connection_engine { PN_CPP_EXTERN proton::container* container() const; private: + void init(); connection_engine(const connection_engine&); connection_engine& operator=(const connection_engine&); messaging_handler* handler_; proton::container* container_; - pn_connection_engine_t c_engine_; + pn_connection_engine_t engine_; }; } // io http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/bindings/cpp/include/proton/sender_options.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/sender_options.hpp b/proton-c/bindings/cpp/include/proton/sender_options.hpp index 64fea2f..0a618ff 100644 --- a/proton-c/bindings/cpp/include/proton/sender_options.hpp +++ b/proton-c/bindings/cpp/include/proton/sender_options.hpp @@ -90,10 +90,10 @@ class sender_options { PN_CPP_EXTERN sender_options& auto_settle(bool); /// Options for the source node of the sender. - PN_CPP_EXTERN sender_options& source(source_options &); + PN_CPP_EXTERN sender_options& source(const source_options &); /// Options for the receiver node of the receiver. - PN_CPP_EXTERN sender_options& target(target_options &); + PN_CPP_EXTERN sender_options& target(const target_options &); /// @cond INTERNAL private: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/bindings/cpp/src/io/connection_engine.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp index 4712b3e..5e6483f 100644 --- a/proton-c/bindings/cpp/src/io/connection_engine.cpp +++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp @@ -40,35 +40,34 @@ namespace proton { namespace io { -connection_engine::connection_engine() : - container_(0) -{ - int err; - if ((err = pn_connection_engine_init(&c_engine_))) - throw proton::error(std::string("connection_engine init failed: ")+pn_code(err)); -} - -connection_engine::connection_engine(class container& cont, event_loop* loop) : - container_(&cont) -{ - int err; - if ((err = pn_connection_engine_init(&c_engine_))) - throw proton::error(std::string("connection_engine init failed: ")+pn_code(err)); +void connection_engine::init() { + if (pn_connection_engine_init(&engine_, pn_connection(), pn_transport()) != 0) { + this->~connection_engine(); // Dtor won't be called on throw from ctor. + throw proton::error(std::string("connection_engine allocation failed")); + } +} + +connection_engine::connection_engine() : handler_(0), container_(0) { init(); } + +connection_engine::connection_engine(class container& cont, event_loop* loop) : handler_(0), container_(&cont) { + init(); connection_context& ctx = connection_context::get(connection()); ctx.container = container_; ctx.event_loop.reset(loop); } -void connection_engine::configure(const connection_options& opts) { - proton::connection c = connection(); +connection_engine::~connection_engine() { + pn_connection_engine_destroy(&engine_); +} + +void connection_engine::configure(const connection_options& opts, bool server) { + proton::connection c(connection()); opts.apply_unbound(c); + if (server) pn_transport_set_server(engine_.transport); + pn_connection_engine_bind(&engine_); opts.apply_bound(c); handler_ = opts.handler(); - connection_context::get(connection()).collector = c_engine_.collector; -} - -connection_engine::~connection_engine() { - pn_connection_engine_final(&c_engine_); + connection_context::get(connection()).collector = engine_.collector; } void connection_engine::connect(const connection_options& opts) { @@ -78,7 +77,7 @@ void connection_engine::connect(const connection_options& opts) { all.update(container_->client_connection_options()); } all.update(opts); - configure(all); + configure(all, false); connection().open(); } @@ -89,12 +88,12 @@ void connection_engine::accept(const connection_options& opts) { all.update(container_->server_connection_options()); } all.update(opts); - configure(all); + configure(all, true); } bool connection_engine::dispatch() { pn_event_t* c_event; - while ((c_event = pn_connection_engine_dispatch(&c_engine_)) != NULL) { + while ((c_event = pn_connection_engine_event(&engine_)) != NULL) { proton_event cpp_event(c_event, container_); try { if (handler_ != 0) { @@ -102,51 +101,56 @@ bool connection_engine::dispatch() { cpp_event.dispatch(adapter); } } catch (const std::exception& e) { - disconnected(error_condition("exception", e.what())); + pn_condition_t *cond = pn_transport_condition(engine_.transport); + if (!pn_condition_is_set(cond)) { + pn_condition_format(cond, "exception", "%s", e.what()); + } } + pn_connection_engine_pop_event(&engine_); } - return !pn_connection_engine_finished(&c_engine_); + return !pn_connection_engine_finished(&engine_); } mutable_buffer connection_engine::read_buffer() { - pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&c_engine_); + pn_rwbytes_t buffer = pn_connection_engine_read_buffer(&engine_); return mutable_buffer(buffer.start, buffer.size); } void connection_engine::read_done(size_t n) { - return pn_connection_engine_read_done(&c_engine_, n); + return pn_connection_engine_read_done(&engine_, n); } void connection_engine::read_close() { - pn_connection_engine_read_close(&c_engine_); + pn_connection_engine_read_close(&engine_); } const_buffer connection_engine::write_buffer() { - pn_bytes_t buffer = pn_connection_engine_write_buffer(&c_engine_); + pn_bytes_t buffer = pn_connection_engine_write_buffer(&engine_); return const_buffer(buffer.start, buffer.size); } void connection_engine::write_done(size_t n) { - return pn_connection_engine_write_done(&c_engine_, n); + return pn_connection_engine_write_done(&engine_, n); } void connection_engine::write_close() { - pn_connection_engine_write_close(&c_engine_); + pn_connection_engine_write_close(&engine_); } void connection_engine::disconnected(const proton::error_condition& err) { - pn_condition_t* condition = pn_connection_engine_condition(&c_engine_); - if (!pn_condition_is_set(condition)) // Don't overwrite existing condition + pn_condition_t* condition = pn_transport_condition(engine_.transport); + if (!pn_condition_is_set(condition)) { set_error_condition(err, condition); - pn_connection_engine_disconnected(&c_engine_); + } + pn_connection_engine_close(&engine_); } proton::connection connection_engine::connection() const { - return make_wrapper(c_engine_.connection); + return make_wrapper(engine_.connection); } proton::transport connection_engine::transport() const { - return make_wrapper(c_engine_.transport); + return make_wrapper(engine_.transport); } proton::container* connection_engine::container() const { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/bindings/cpp/src/sender_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp index f5d5525..bed4a69 100644 --- a/proton-c/bindings/cpp/src/sender_options.cpp +++ b/proton-c/bindings/cpp/src/sender_options.cpp @@ -108,8 +108,8 @@ void sender_options::update(const sender_options& x) { impl_->update(*x.impl_); sender_options& sender_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; } sender_options& sender_options::delivery_mode(proton::delivery_mode m) {impl_->delivery_mode = m; return *this; } sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; } -sender_options& sender_options::source(source_options &s) {impl_->source = s; return *this; } -sender_options& sender_options::target(target_options &s) {impl_->target = s; return *this; } +sender_options& sender_options::source(const source_options &s) {impl_->source = s; return *this; } +sender_options& sender_options::target(const target_options &s) {impl_->target = s; return *this; } void sender_options::apply(sender& s) const { impl_->apply(s); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/docs/api/index.md ---------------------------------------------------------------------- diff --git a/proton-c/docs/api/index.md b/proton-c/docs/api/index.md index 10aea84..ccd679d 100644 --- a/proton-c/docs/api/index.md +++ b/proton-c/docs/api/index.md @@ -1,5 +1,39 @@ Proton Documentation {#index} ==================== -The proton library contains two APIs: The [Engine API](@ref engine), -and the [Messenger API](@ref messenger). +## The Protocol Engine + +The [Engine API](@ref engine) is a "pure AMQP" toolkit, it decodes AMQP bytes +into proton [events](@ref event) and generates AMQP bytes from application +calls. + +The [connection engine](@ref connection_engine) provides a simple bytes in/bytes +out, event-driven interface so you can read AMQP data from any source, process +the resulting [events](@ref event) and write AMQP output to any destination. + +There is no IO or threading code in this part of the library, so it can be +embedded in many different environments. The proton project provides language +bindings (Python, Ruby, Go etc.) that embed it into the standard IO and +threading facilities of the bound language. + +## Integrating with IO + +The [Proactor API](@ref proactor) is a pro-active, asynchronous framewokr to +build single or multi-threaded Proton C applications. + +For advanced use-cases it is possible to write your own implementation of the +proactor API for an unusual IO or threading framework. Any proton application +written to the proactor API will be able to use your implementation. + +## Messenger and Reactor APIs + +The [Messenger](@ref messenger) [Reactor](@ref reactor) APIs were intended +to be simple APIs that included IO support directly out of the box. + +They both had good points but were both based on the assumption of a single-threaded +environment using a POSIX-like poll() call. This was a problem for performance on some +platforms and did not support multi-threaded applications. + +Note however that application code which interacts with the AMQP @ref engine and +processes AMQP @ref "events" event is the same for the proactor and reactor APIs, +so is quite easy to convert. The main difference is in how connections are set up. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/cid.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cid.h b/proton-c/include/proton/cid.h index 1e4715f..cd68de4 100644 --- a/proton-c/include/proton/cid.h +++ b/proton-c/include/proton/cid.h @@ -56,7 +56,10 @@ typedef enum { CID_pn_selector, CID_pn_selectable, - CID_pn_url + CID_pn_url, + + CID_pn_listener, + CID_pn_proactor } pn_cid_t; #endif /* cid.h */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/condition.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/condition.h b/proton-c/include/proton/condition.h index cf2f8aa..ae2beff 100644 --- a/proton-c/include/proton/condition.h +++ b/proton-c/include/proton/condition.h @@ -165,6 +165,10 @@ PN_EXTERN const char *pn_condition_redirect_host(pn_condition_t *condition); */ PN_EXTERN int pn_condition_redirect_port(pn_condition_t *condition); +PN_EXTERN int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src); +PN_EXTERN pn_condition_t *pn_condition(void); +PN_EXTERN void pn_condition_free(pn_condition_t *); + /** @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/connection.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/connection.h b/proton-c/include/proton/connection.h index c5b5490..0ed23b0 100644 --- a/proton-c/include/proton/connection.h +++ b/proton-c/include/proton/connection.h @@ -82,6 +82,7 @@ extern "C" { */ #define PN_REMOTE_MASK (PN_REMOTE_UNINIT | PN_REMOTE_ACTIVE | PN_REMOTE_CLOSED) +PN_EXTERN pn_connection_t *pn_connection(void); /** * Factory to construct a new Connection. * @@ -148,6 +149,13 @@ PN_EXTERN pn_error_t *pn_connection_error(pn_connection_t *connection); PN_EXTERN void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collector); /** + * Get the collector set with pn_connection_collect() + * @return NULL if pn_connection_collect() has not been called. +*/ +PN_EXTERN pn_collector_t* pn_connection_collector(pn_connection_t *connection); + + +/** * @deprecated * Get the application context that is associated with a connection * object. @@ -477,6 +485,16 @@ PN_EXTERN pn_data_t *pn_connection_remote_properties(pn_connection_t *connection */ PN_EXTERN pn_transport_t *pn_connection_transport(pn_connection_t *connection); +/** + * Create a connection with `size` bytes of extra aligned storage in the same heap block. + */ +PN_EXTERN pn_connection_t* pn_connection_with_extra(size_t size); + +/** + * Get the start and size of extra storage allocated by pn_connection_extra() + */ +PN_EXTERN pn_rwbytes_t pn_connection_get_extra(pn_connection_t *connection); + /** @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/connection_engine.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/connection_engine.h b/proton-c/include/proton/connection_engine.h index d9df77b..b7022a9 100644 --- a/proton-c/include/proton/connection_engine.h +++ b/proton-c/include/proton/connection_engine.h @@ -20,160 +20,289 @@ * under the License. */ -///@file -/// -/// **Experimental** The Connection Engine API wraps up the proton engine -/// objects associated with a single connection: pn_connection_t, pn_transport_t -/// and pn_collector_t. It provides a simple bytes-in/bytes-out interface for IO -/// and generates pn_event_t events to be handled by the application. -/// -/// The connection engine can be fed with raw AMQP bytes from any source, and it -/// generates AMQP byte output to be written to any destination. You can use the -/// engine to integrate proton AMQP with any IO library, or native IO on any -/// platform. -/// -/// The engine is not thread safe but each engine is independent. Separate -/// engines can be used concurrently. For example a multi-threaded application -/// can process connections in multiple threads, but serialize work for each -/// connection to the corresponding engine. -/// -/// The engine is designed to be thread and IO neutral so it can be integrated with -/// single or multi-threaded code in reactive or proactive IO frameworks. -/// -/// Summary of use: -/// -/// - while !pn_connection_engine_finished() -/// - Call pn_connection_engine_dispatch() to dispatch events until it returns NULL. -/// - Read data from your source into pn_connection_engine_read_buffer() -/// - Call pn_connection_engine_read_done() when complete. -/// - Write data from pn_connection_engine_write_buffer() to your destination. -/// - Call pn_connection_engine_write_done() to indicate how much was written. -/// -/// Note on blocking: the _read/write_buffer and _read/write_done functions can -/// all generate events that may cause the engine to finish. Before you wait for -/// IO, always drain pn_connection_engine_dispatch() till it returns NULL and -/// check pn_connection_engine_finished() in case there is nothing more to do.. -/// -/// Note on error handling: the pn_connection_engine_*() functions do not return -/// an error code. If an error occurs it will be reported as a -/// PN_TRANSPORT_ERROR event and pn_connection_engine_finished() will return -/// true once all final events have been processed. -/// -/// @defgroup connection_engine The Connection Engine -/// @{ -/// - -#include <proton/condition.h> -#include <proton/event.h> +/** + * @file + * + * **Experimental** The connection IO API is a set of functions to simplify + * integrating proton with different IO and concurrency platforms. The portable + * parts of a Proton application should use the @ref engine types. We will + * use "application" to mean the portable part of the application and + * "integration" to mean code that integrates with a particular IO platform. + * + * The connection_engine functions take a @ref pn_connection_t\*, and perform common + * tasks involving the @ref pn_connection_t and it's @ref pn_transport_t and + * @ref pn_collector_t so you can treat them as a unit. You can also work with + * these types directly for features not available via @ref connection_engine API. + * + * @defgroup connection_engine Connection Engine + * + * **Experimental**: Toolkit for integrating proton with arbitrary network or IO + * transports. Provides a single point of control for an AMQP connection and + * a simple bytes-in/bytes-out interface that lets you: + * + * - process AMQP-encoded bytes from some input byte stream + * - generate @ref pn_event_t events for your application to handle + * - encode resulting AMQP output bytes for some output byte stream + * + * The engine contains a @ref pn_connection_t, @ref pn_transport_t and @ref + * pn_collector_t and provides functions to operate on all three as a unit for + * IO integration. You can also use them directly for anything not covered by + * this API + * + * For example a simple blocking IO integration with the imaginary "my_io" library: + * + * pn_connection_engine_t ce; + * pn_connection_engine_init(&ce); + * while (!pn_connection_engine_finished(&ce) { + * // Dispatch events to be handled by the application. + * pn_event_t *e; + * while ((e = pn_connection_engine_event(&ce))!= NULL) { + * my_app_handle(e); // Pass to the application handler + * switch (pn_event_type(e)) { + * case PN_CONNECTION_INIT: pn_connection_engine_bind(&ce); + * // Only for full-duplex IO where read/write can shutdown separately. + * case PN_TRANSPORT_CLOSE_READ: my_io_shutdown_read(...); break; + * case PN_TRANSPORT_CLOSE_WRITE: my_io_shutdown_write(...); break; + * default: break; + * }; + * e = pn_connection_engine_pop_event(&ce); + * } + * // Read from my_io into the connection buffer + * pn_rwbytes_t readbuf = pn_connection_engine_read_buffer(&ce); + * if (readbuf.size) { + * size_t n = my_io_read(readbuf.start, readbuf.size, ...); + * if (n > 0) { + * pn_connection_engine_read_done(&ce, n); + * } else if (n < 0) { + * pn_connection_engine_errorf(&ce, "read-err", "something-bad (%d): %s", n, ...); + * pn_connection_engine_read_close(&ce); + * } + * } + * // Write from connection buffer to my_io + * pn_bytes_t writebuf = pn_connection_engine_write_buffer(&ce); + * if (writebuf.size) { + * size_t n = my_io_write_data(writebuf.start, writebuf.size, ...); + * if (n < 0) { + * pn_connection_engine_errorf(&ce, "write-err", "something-bad (%d): %s", d, ...); + * pn_connection_engine_write_close(&ce); + * } else { + * pn_connection_engine_write_done(&ce, n); + * } + * } + * } + * // If my_io doesn't have separate read/write shutdown, then we should close it now. + * my_io_close(...); + * + * AMQP is a full-duplex, asynchronous protocol. The "read" and "write" sides of + * an AMQP connection can close separately, the example shows how to handle this + * for full-duplex IO or IO with a simple close. + * + * The engine buffers events, you must keep processing till + * pn_connection_engine_finished() is true, to ensure all reading, writing and event + * handling (including ERROR and FINAL events) is completely finished. + * + * ## Error handling + * + * The pn_connection_engine_*() functions do not return an error code. IO errors set + * the transport condition and are returned as a PN_TRANSPORT_ERROR. The integration + * code can set errors using pn_connection_engine_errorf() + * + * ## Other IO patterns + * + * This API supports asynchronous, proactive, non-blocking and reactive IO. An + * integration does not have to follow the dispatch-read-write sequence above, + * but note that you should handle all available events before calling + * pn_connection_engine_read_buffer() and check that `size` is non-zero before + * starting a blocking or asynchronous read call. A `read` started while there + * are unprocessed CLOSE events in the buffer may never complete. + * + * ## Thread safety + * + * The @ref engine types are not thread safe, but each connection and its + * associated types forms an independent unit. Different connections can be + * processed concurrently by different threads. + * + * @defgroup connection_engine Connection IO + * @{ + */ + #include <proton/import_export.h> +#include <proton/event.h> #include <proton/types.h> +#include <stdarg.h> + #ifdef __cplusplus extern "C" { #endif -/// A connection engine is a trio of pn_connection_t, pn_transport_t and pn_collector_t. -/// Use the pn_connection_engine_*() functions to operate on it. -/// It is a plain struct, not a proton object. Use pn_connection_engine_init to set up -/// the initial objects and pn_connection_engine_final to release them. -/// +/** + * Struct containing a connection, transport and collector. See + * pn_connection_engine_init(), pn_connection_engine_destroy() and pn_connection_engine() + */ typedef struct pn_connection_engine_t { - pn_connection_t* connection; - pn_transport_t* transport; - pn_collector_t* collector; - pn_event_t* event; + pn_connection_t *connection; + pn_transport_t *transport; + pn_collector_t *collector; } pn_connection_engine_t; -/// Initialize a pn_connection_engine_t struct with a new connection and -/// transport. -/// -/// Call pn_connection_engine_final to free resources when you are done. -/// -///@return 0 on success, a proton error code on failure (@see error.h) -/// -PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t* engine); - -/// Free resources used by the engine, set the connection and transport pointers -/// to NULL. -PN_EXTERN void pn_connection_engine_final(pn_connection_engine_t* engine); - -/// Get the engine's read buffer. Read data from your IO source to buf.start, up -/// to a max of buf.size. Then call pn_connection_engine_read_done(). -/// -/// buf.size==0 means the engine cannot read presently, calling -/// pn_connection_engine_dispatch() may create more buffer space. -/// -PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t*); - -/// Consume the first n bytes of data in pn_connection_engine_read_buffer() and -/// update the buffer. -PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t*, size_t n); - -/// Close the read side of the transport when no more data is available. -/// Note there may still be events for pn_connection_engine_dispatch() or data -/// in pn_connection_engine_write_buffer() -PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t*); - -/// Get the engine's write buffer. Write data from buf.start to your IO destination, -/// up to a max of buf.size. Then call pn_connection_engine_write_done(). -/// -/// buf.size==0 means the engine has nothing to write presently. Calling -/// pn_connection_engine_dispatch() may generate more data. -PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t*); - -/// Call when the first n bytes of pn_connection_engine_write_buffer() have been -/// written to IO and can be re-used for new data. Updates the buffer. -PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t*, size_t n); - -/// Call when the write side of IO has closed and no more data can be written. -/// Note that there may still be events for pn_connection_engine_dispatch() or -/// data to read into pn_connection_engine_read_buffer(). -PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t*); - -/// Close both sides of the transport, equivalent to -/// pn_connection_engine_read_close(); pn_connection_engine_write_close() -/// -/// You must still call pn_connection_engine_dispatch() to process final -/// events. -/// -/// To provide transport error information to the handler, set it on -/// pn_connection_engine_condition() -/// *before* calling pn_connection_engine_disconnected(). This sets -/// the error on the pn_transport_t object. -/// -/// Note this does *not* modify the pn_connection_t, so you can distinguish -/// between a connection close error sent by the remote peer (which will set the -/// connection condition) and a transport error (which sets the transport -/// condition.) -/// -PN_EXTERN void pn_connection_engine_disconnected(pn_connection_engine_t*); - -/// Get the next available event. -/// Call in a loop until it returns NULL to dispatch all available events. -/// Note this call may modify the read and write buffers. -/// -/// @return Pointer to the next event, or NULL if there are none available. -/// -PN_EXTERN pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t*); - -/// Return true if the engine is finished - all data has been written, all -/// events have been handled and the transport is closed. -PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t*); - -/// Get the AMQP connection, owned by the pn_connection_engine_t. -PN_EXTERN pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t*); - -/// Get the proton transport, owned by the pn_connection_engine_t. -PN_EXTERN pn_transport_t* pn_connection_engine_transport(pn_connection_engine_t*); - -/// Get the condition object for the engine's transport. -/// -/// Note that IO errors should be set on this, the transport condition, not on -/// the pn_connection_t condition. The connection's condition is for errors -/// received via the AMQP protocol, the transport condition is for errors in the -/// the IO layer such as a socket read or disconnect errors. -/// -PN_EXTERN pn_condition_t* pn_connection_engine_condition(pn_connection_engine_t*); +/** + * Set #connection and #transport to the provided values, or create a new + * @ref pn_connection_t or @ref pn_transport_t if either is NULL. + * The provided values belong to the connection engine and will be freed by + * pn_connection_engine_destroy() + * + * Create a new @ref pn_collector_t and set as #collector. + * + * The transport and connection are *not* bound at this point. You should + * configure them as needed and let the application handle the + * PN_CONNECTION_INIT from pn_connection_engine_event() before calling + * pn_connection_engine_bind(). + * + * @return if any allocation fails call pn_connection_engine_destroy() and return PN_OUT_OF_MEMORY + */ +PN_EXTERN int pn_connection_engine_init(pn_connection_engine_t*, pn_connection_t*, pn_transport_t*); + +/** + * Bind the connection to the transport when the external IO is ready. + * + * The following functions (if called at all) must be called *before* bind: + * pn_connection_set_username(), pn_connection_set_password(), pn_transport_set_server() + * + * If there is an external IO error during setup, set a transport error, close + * the transport and then bind. The error events are reported to the application + * via pn_connection_engine_event(). + * + * @return an error code if the bind fails. + */ +PN_EXTERN int pn_connection_engine_bind(pn_connection_engine_t *); + +/** + * Unbind, release and free #connection, #transpot and #collector. Set all pointers to NULL. + * Does not free the @ref pn_connection_engine_t struct itself. + */ +PN_EXTERN void pn_connection_engine_destroy(pn_connection_engine_t *); + +/** + * Get the read buffer. + * + * Copy data from your input byte source to buf.start, up to buf.size. + * Call pn_connection_engine_read_done() when reading is complete. + * + * buf.size==0 means reading is not possible: no buffer space or the read side is closed. + */ +PN_EXTERN pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *); + +/** + * Process the first n bytes of data in pn_connection_engine_read_buffer() and + * reclaim the buffer space. + */ +PN_EXTERN void pn_connection_engine_read_done(pn_connection_engine_t *, size_t n); + +/** + * Close the read side. Call when the IO can no longer be read. + */ +PN_EXTERN void pn_connection_engine_read_close(pn_connection_engine_t *); + +/** + * True if read side is closed. + */ +PN_EXTERN bool pn_connection_engine_read_closed(pn_connection_engine_t *); + +/** + * Get the write buffer. + * + * Write data from buf.start to your IO destination, up to a max of buf.size. + * Call pn_connection_engine_write_done() when writing is complete. + * + * buf.size==0 means there is nothing to write. + */ + PN_EXTERN pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *); + +/** + * Call when the first n bytes of pn_connection_engine_write_buffer() have been + * written to IO. Reclaims the buffer space and reset the write buffer. + */ +PN_EXTERN void pn_connection_engine_write_done(pn_connection_engine_t *, size_t n); + +/** + * Close the write side. Call when IO can no longer be written to. + */ +PN_EXTERN void pn_connection_engine_write_close(pn_connection_engine_t *); + +/** + * True if write side is closed. + */ +PN_EXTERN bool pn_connection_engine_write_closed(pn_connection_engine_t *); + +/** + * Close both sides side. + */ +PN_EXTERN void pn_connection_engine_close(pn_connection_engine_t * c); + +/** + * Get the current event. Call pn_connection_engine_done() when done handling it. + * Note that if PN_TRACE_EVT is enabled this will log the event, so you should + * avoid calling it more than once per event. Use pn_connection_engine_has_event() + * to silently test if any events are available. + * + * @return NULL if there are no more events ready. Reading/writing data may produce more. + */ +PN_EXTERN pn_event_t* pn_connection_engine_event(pn_connection_engine_t *); + +/** + * True if pn_connection_engine_event() will return a non-NULL event. + */ +PN_EXTERN bool pn_connection_engine_has_event(pn_connection_engine_t *); + +/** + * Drop the current event, advance pn_connection_engine_event() to the next event. + */ +PN_EXTERN void pn_connection_engine_pop_event(pn_connection_engine_t *); + +/** + * Return true if the the engine is closed for reading and writing and there are + * no more events. + * + * Call pn_connection_engine_free() to free all related memory. + */ +PN_EXTERN bool pn_connection_engine_finished(pn_connection_engine_t *); + +/** + * Set IO error information. + * + * The name and formatted description are set on the transport condition, and + * returned as a PN_TRANSPORT_ERROR event from pn_connection_engine_event(). + * + * You must call this *before* pn_connection_engine_read_close() or + * pn_connection_engine_write_close() to ensure the error is processed. + * + * If there is already a transport condition set, this call does nothing. For + * more complex cases, you can work with the transport condition directly using: + * + * pn_condition_t *cond = pn_transport_condition(pn_connection_transport(conn)); + */ +PN_EXTERN void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...); + +/** + * Set IO error information via a va_list, see pn_connection_engine_errorf() + */ +PN_EXTERN void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list); + +/** + * Log a string message using the connection's transport log. + */ +PN_EXTERN void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg); + +/** + * Log a printf formatted message using the connection's transport log. + */ +PN_EXTERN void pn_connection_engine_logf(pn_connection_engine_t *ce, char *fmt, ...); + +/** + * Log a printf formatted message using the connection's transport log. + */ +PN_EXTERN void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap); ///@} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/event.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/event.h b/proton-c/include/proton/event.h index 16d2bda..a4d3ce9 100644 --- a/proton-c/include/proton/event.h +++ b/proton-c/include/proton/event.h @@ -277,13 +277,19 @@ typedef enum { PN_TRANSPORT_ERROR, /** - * Indicates that the head of the transport has been closed. This + * Indicates that the "head" or writing end of the transport has been closed. This * means the transport will never produce more bytes for output to * the network. Events of this type point to the relevant transport. */ PN_TRANSPORT_HEAD_CLOSED, /** + * The write side of the transport is closed, it will no longer produce bytes + * to write to external IO. Synonynm for PN_TRANSPORT_HEAD_CLOSED + */ + PN_TRANSPORT_WRITE_CLOSED = PN_TRANSPORT_HEAD_CLOSED, + + /** * Indicates that the tail of the transport has been closed. This * means the transport will never be able to process more bytes from * the network. Events of this type point to the relevant transport. @@ -291,6 +297,12 @@ typedef enum { PN_TRANSPORT_TAIL_CLOSED, /** + * The read side of the transport is closed, it will no longer read bytes + * from external IO. Synonynm for PN_TRANSPORT_TAIL_CLOSED + */ + PN_TRANSPORT_READ_CLOSED = PN_TRANSPORT_TAIL_CLOSED, + + /** * Indicates that the both the head and tail of the transport are * closed. Events of this type point to the relevant transport. */ @@ -302,7 +314,39 @@ typedef enum { PN_SELECTABLE_WRITABLE, PN_SELECTABLE_ERROR, PN_SELECTABLE_EXPIRED, - PN_SELECTABLE_FINAL + PN_SELECTABLE_FINAL, + + /** + * pn_connection_wake() was called. + * Events of this type point to the @ref pn_connection_t. + */ + PN_CONNECTION_WAKE, + + /** + * pn_listener_close() was called or an error occurred, see pn_listener_condition() + * Events of this type point to the @ref pn_listener_t. + */ + PN_LISTENER_CLOSE, + + /** + * pn_proactor_interrupt() was called to interrupt a proactor thread + * Events of this type point to the @ref pn_proactor_t. + */ + PN_PROACTOR_INTERRUPT, + + /** + * pn_proactor_set_timeout() time limit expired. + * Events of this type point to the @ref pn_proactor_t. + */ + PN_PROACTOR_TIMEOUT, + + /** + * The proactor becaome inactive: all listeners and connections are closed and + * their events processed, the timeout is expired. + * + * Events of this type point to the @ref pn_proactor_t. + */ + PN_PROACTOR_INACTIVE } pn_event_type_t; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/extra.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/extra.h b/proton-c/include/proton/extra.h new file mode 100644 index 0000000..ea2e1ef --- /dev/null +++ b/proton-c/include/proton/extra.h @@ -0,0 +1,69 @@ +#ifndef EXTRA_H +#define EXTRA_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/type_compat.h> +#include <proton/types.h> +#include <stddef.h> +#include <stdlib.h> + +/** + * @cond INTERNAL + * Support for allocating extra aligned memory after a type. + */ + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * extra_t contains a size and is maximally aligned so the memory immediately + * after it can store any type of value. + */ +typedef union pn_extra_t { + size_t size; +#if __STDC_VERSION__ >= 201112 + max_align_t max; +#else +/* Not standard but fairly safe */ + uint64_t i; + long double d; + void *v; + void (*fp)(void); +#endif +} pn_extra_t; + +static inline pn_rwbytes_t pn_extra_rwbytes(pn_extra_t *x) { + return pn_rwbytes(x->size, (char*)(x+1)); +} + +/* Declare private helper struct for T */ +#define PN_EXTRA_DECLARE(T) typedef struct T##__extra { T base; pn_extra_t extra; } T##__extra +#define PN_EXTRA_SIZEOF(T, N) (sizeof(T##__extra)+(N)) +#define PN_EXTRA_GET(T, P) pn_extra_rwbytes(&((T##__extra*)(P))->extra) + +#ifdef __cplusplus +} +#endif + +/** @endcond */ + +#endif // EXTRA_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/listener.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/listener.h b/proton-c/include/proton/listener.h new file mode 100644 index 0000000..f55479b --- /dev/null +++ b/proton-c/include/proton/listener.h @@ -0,0 +1,76 @@ +#ifndef PROTON_LISTENER_H +#define PROTON_LISTENER_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/types.h> + +#ifdef __cplusplus +extern "C" { +#endif + +/** + * @file + * + * Listener API for the proton @proactor + * + * @defgroup listener Listener + * @ingroup proactor + * @{ + */ + +typedef struct pn_proactor_t pn_proactor_t; +typedef struct pn_condition_t pn_condition_t; + +/** + * Listener accepts connections, see pn_proactor_listen() + */ +typedef struct pn_listener_t pn_listener_t; + +/** + * The proactor that created the listener. + */ +pn_proactor_t *pn_listener_proactor(pn_listener_t *c); + +/** + * Get the error condition for a listener. + */ +pn_condition_t *pn_listener_condition(pn_listener_t *l); + +/** + * Get the user-provided value associated with the listener in pn_proactor_listen() + * The start address is aligned so you can cast it to any type. + */ +pn_rwbytes_t pn_listener_get_extra(pn_listener_t*); + +/** + * Close the listener (thread safe). + */ +void pn_listener_close(pn_listener_t *l); + +/** + *@} + */ + +#ifdef __cplusplus +} +#endif + +#endif // PROTON_LISTENER_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/object.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/object.h b/proton-c/include/proton/object.h index bafdcf4..0433b58 100644 --- a/proton-c/include/proton/object.h +++ b/proton-c/include/proton/object.h @@ -159,6 +159,29 @@ PREFIX ## _t *PREFIX ## _new(void) { \ PREFIX ## _inspect \ } +/* Class to identify a plain C struct in a pn_event_t. No refcounting or memory management. */ +#define PN_STRUCT_CLASSDEF(PREFIX, CID) \ + const pn_class_t *PREFIX ## __class(void); \ + static const pn_class_t *PREFIX ## _reify(void *p) { return PREFIX ## __class(); } \ + const pn_class_t *PREFIX ## __class(void) { \ + static const pn_class_t clazz = { \ + #PREFIX, \ + CID, \ + NULL, /*_new*/ \ + NULL, /*_initialize*/ \ + pn_void_incref, \ + pn_void_decref, \ + pn_void_refcount, \ + NULL, /* _finalize */ \ + NULL, /* _free */ \ + PREFIX ## _reify, \ + pn_void_hashcode, \ + pn_void_compare, \ + pn_void_inspect \ + }; \ + return &clazz; \ + } + PN_EXTERN pn_cid_t pn_class_id(const pn_class_t *clazz); PN_EXTERN const char *pn_class_name(const pn_class_t *clazz); PN_EXTERN void *pn_class_new(const pn_class_t *clazz, size_t size); @@ -181,6 +204,10 @@ PN_EXTERN intptr_t pn_class_compare(const pn_class_t *clazz, void *a, void *b); PN_EXTERN bool pn_class_equals(const pn_class_t *clazz, void *a, void *b); PN_EXTERN int pn_class_inspect(const pn_class_t *clazz, void *object, pn_string_t *dst); +PN_EXTERN void *pn_void_new(const pn_class_t *clazz, size_t size); +PN_EXTERN void pn_void_incref(void *object); +PN_EXTERN void pn_void_decref(void *object); +PN_EXTERN int pn_void_refcount(void *object); PN_EXTERN uintptr_t pn_void_hashcode(void *object); PN_EXTERN intptr_t pn_void_compare(void *a, void *b); PN_EXTERN int pn_void_inspect(void *object, pn_string_t *dst); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/proactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/proactor.h b/proton-c/include/proton/proactor.h new file mode 100644 index 0000000..49d7b6a --- /dev/null +++ b/proton-c/include/proton/proactor.h @@ -0,0 +1,174 @@ +#ifndef PROTON_PROACTOR_H +#define PROTON_PROACTOR_H + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include <proton/types.h> +#include <proton/import_export.h> +#include <proton/listener.h> + +#ifdef __cplusplus +extern "C" { +#endif + +typedef struct pn_condition_t pn_condition_t; + +/** + * @file + * + * **Experimental**: Proactor API for the the proton @ref engine. + * + * @defgroup proactor Proactor + * + * **Experimental**: Proactor API for portable, multi-threaded, asynchronous applications. + * + * The proactor establishes and listens for connections. It creates the @ref + * "transport" transport that sends and receives data over the network and + * delivers @ref "events" event to application threads for processing. + * + * ## Multi-threading + * + * The @ref proactor is thread-safe, but the @ref "protocol engine" is not. The + * proactor ensures that each @ref "connection" connection and its associated + * values (@ref session, @ref link etc.) is processed sequentially, even if there + * are multiple application threads. See pn_proactor_wait() + * + * @{ + */ + +/** + * The proactor creates and manage @ref "transports" transport and delivers @ref + * "event" events to the application. + * + */ +typedef struct pn_proactor_t pn_proactor_t; + +/** + * Create a proactor. Must be freed with pn_proactor_free() + */ +pn_proactor_t *pn_proactor(void); + +/** + * Free the proactor. + */ +void pn_proactor_free(pn_proactor_t*); + +/* FIXME aconway 2016-11-12: connect and listen need options to enable + things like websockets, alternate encryption or other features. + The "extra" parameter will be replaced by an "options" parameter + that will include providing extra data and other manipulators + to affect how the connection is processed. +*/ + +/** + * Asynchronous connect: a connection and transport will be created, the + * relevant events will be returned by pn_proactor_wait() + * + * Errors are indicated by PN_TRANSPORT_ERROR/PN_TRANSPORT_CLOSE events. + * + * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref + * pn_rwbytes_null for nothing. + * + * @return error if the connect cannot be initiated e.g. an allocation failure. + * IO errors will be returned as transport events via pn_proactor_wait() + */ +int pn_proactor_connect(pn_proactor_t*, const char *host, const char *port, pn_bytes_t extra); + +/** + * Asynchronous listen: start listening, connections will be returned by pn_proactor_wait() + * An error are indicated by PN_LISTENER_ERROR event. + * + * @param extra bytes to copy to pn_connection_get_extra() on the new connection, @ref + * pn_rwbytes_null for nothing. + * + * @return error if the connect cannot be initiated e.g. an allocation failure. + * IO errors will be returned as transport events via pn_proactor_wait() + */ +pn_listener_t *pn_proactor_listen(pn_proactor_t *, const char *host, const char *port, int backlog, pn_bytes_t extra); + +/** + * Wait for an event. Can be called in multiple threads concurrently. + * You must call pn_event_done() when the event has been handled. + * + * The proactor ensures that events that cannot be handled concurrently + * (e.g. events for for the same connection) are never returned concurrently. + */ +pn_event_t *pn_proactor_wait(pn_proactor_t* d); + +/** + * Cause PN_PROACTOR_INTERRUPT to be returned to exactly one thread calling wait() + * for each call to pn_proactor_interrupt(). Thread safe. + */ +void pn_proactor_interrupt(pn_proactor_t* d); + +/** + * Cause PN_PROACTOR_TIMEOUT to be returned to a thread calling wait() after + * timeout milliseconds. Thread safe. + * + * Note calling pn_proactor_set_timeout() again before the PN_PROACTOR_TIMEOUT is + * delivered will cancel the previous timeout and deliver an event only after + * the new timeout. + */ +void pn_proactor_set_timeout(pn_proactor_t* d, pn_millis_t timeout); + +/** + * Cause a PN_CONNECTION_WAKE event to be returned by the proactor, even if + * there are no IO events pending for the connection. + * + * Thread safe: this is the only pn_connection_ function that can be + * called concurrently. + * + * Wakes can be "coalesced" - if several pn_connection_wake() calls happen + * concurrently, there may be only one PN_CONNECTION_WAKE event. + */ +void pn_connection_wake(pn_connection_t *c); + +/** + * The proactor that created the connection. + */ +pn_proactor_t *pn_connection_proactor(pn_connection_t *c); + +/** + * Call when a proactor event has been handled. Does nothing if not a proactor event. + * + * Thread safe: May be called from any thread but must be called exactly once + * for each event returned by pn_proactor_wait() + */ +void pn_event_done(pn_event_t *); + +/** + * Get the proactor that created the event or NULL. + */ +pn_proactor_t *pn_event_proactor(pn_event_t *); + +/** + * Get the listener for the event or NULL. + */ +pn_listener_t *pn_event_listener(pn_event_t *); + +/** + * @} + */ + +#ifdef __cplusplus +} +#endif + +#endif // PROTON_PROACTOR_H http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/include/proton/types.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/types.h b/proton-c/include/proton/types.h index 72db6f8..6799f86 100644 --- a/proton-c/include/proton/types.h +++ b/proton-c/include/proton/types.h @@ -71,6 +71,7 @@ typedef struct pn_bytes_t { } pn_bytes_t; PN_EXTERN pn_bytes_t pn_bytes(size_t size, const char *start); +static const pn_bytes_t pn_bytes_null = { 0, NULL }; /** A non-const byte buffer. */ typedef struct pn_rwbytes_t { @@ -79,6 +80,7 @@ typedef struct pn_rwbytes_t { } pn_rwbytes_t; PN_EXTERN pn_rwbytes_t pn_rwbytes(size_t size, char *start); +static const pn_bytes_t pn_rwbytes_null = { 0, NULL }; /** @} */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/src/engine/connection_engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/connection_engine.c b/proton-c/src/engine/connection_engine.c index 5d184a1..f31ddb0 100644 --- a/proton-c/src/engine/connection_engine.c +++ b/proton-c/src/engine/connection_engine.c @@ -16,109 +16,148 @@ * specific language governing permissions and limitations * under the License. */ -#include "engine-internal.h" +#include "engine-internal.h" +#include <proton/condition.h> #include <proton/connection.h> #include <proton/connection_engine.h> #include <proton/transport.h> #include <string.h> -int pn_connection_engine_init(pn_connection_engine_t* e) { - memset(e, 0, sizeof(*e)); - e->connection = pn_connection(); - e->transport = pn_transport(); - e->collector = pn_collector(); - if (!e->connection || !e->transport || !e->collector) { - pn_connection_engine_final(e); - return PN_OUT_OF_MEMORY; - } - pn_connection_collect(e->connection, e->collector); - return PN_OK; +int pn_connection_engine_init(pn_connection_engine_t* ce, pn_connection_t *c, pn_transport_t *t) { + ce->connection = c ? c : pn_connection(); + ce->transport = t ? t : pn_transport(); + ce->collector = pn_collector(); + if (!ce->connection || !ce->transport || !ce->collector) { + pn_connection_engine_destroy(ce); + return PN_OUT_OF_MEMORY; + } + pn_connection_collect(ce->connection, ce->collector); + return 0; } -void pn_connection_engine_final(pn_connection_engine_t* e) { - if (e->transport && e->connection) { - pn_transport_unbind(e->transport); - pn_decref(e->transport); - } - if (e->collector) - pn_collector_free(e->collector); /* Break cycle with connection */ - if (e->connection) - pn_decref(e->connection); - memset(e, 0, sizeof(*e)); +int pn_connection_engine_bind(pn_connection_engine_t *ce) { + return pn_transport_bind(ce->transport, ce->connection); } -pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t* e) { - ssize_t cap = pn_transport_capacity(e->transport); - if (cap > 0) - return pn_rwbytes(cap, pn_transport_tail(e->transport)); - else - return pn_rwbytes(0, 0); +void pn_connection_engine_destroy(pn_connection_engine_t *ce) { + if (ce->transport) { + pn_transport_unbind(ce->transport); + pn_transport_free(ce->transport); + } + if (ce->collector) pn_collector_free(ce->collector); + if (ce->connection) pn_connection_free(ce->connection); + memset(ce, 0, sizeof(*ce)); } -void pn_connection_engine_read_done(pn_connection_engine_t* e, size_t n) { - if (n > 0) - pn_transport_process(e->transport, n); +pn_rwbytes_t pn_connection_engine_read_buffer(pn_connection_engine_t *ce) { + ssize_t cap = pn_transport_capacity(ce->transport); + return (cap > 0) ? pn_rwbytes(cap, pn_transport_tail(ce->transport)) : pn_rwbytes(0, 0); } -void pn_connection_engine_read_close(pn_connection_engine_t* e) { - pn_transport_close_tail(e->transport); +void pn_connection_engine_read_done(pn_connection_engine_t *ce, size_t n) { + if (n > 0) pn_transport_process(ce->transport, n); } -pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t* e) { - ssize_t pending = pn_transport_pending(e->transport); - if (pending > 0) - return pn_bytes(pending, pn_transport_head(e->transport)); - else - return pn_bytes(0, 0); +bool pn_connection_engine_read_closed(pn_connection_engine_t *ce) { + return pn_transport_capacity(ce->transport) < 0; } -void pn_connection_engine_write_done(pn_connection_engine_t* e, size_t n) { - if (n > 0) - pn_transport_pop(e->transport, n); +void pn_connection_engine_read_close(pn_connection_engine_t *ce) { + if (!pn_connection_engine_read_closed(ce)) { + pn_transport_close_tail(ce->transport); + } } -void pn_connection_engine_write_close(pn_connection_engine_t* e){ - pn_transport_close_head(e->transport); +pn_bytes_t pn_connection_engine_write_buffer(pn_connection_engine_t *ce) { + ssize_t pending = pn_transport_pending(ce->transport); + return (pending > 0) ? + pn_bytes(pending, pn_transport_head(ce->transport)) : pn_bytes_null; } -void pn_connection_engine_disconnected(pn_connection_engine_t* e) { - pn_connection_engine_read_close(e); - pn_connection_engine_write_close(e); +void pn_connection_engine_write_done(pn_connection_engine_t *ce, size_t n) { + if (n > 0) + pn_transport_pop(ce->transport, n); } -static void log_event(pn_connection_engine_t *engine, pn_event_t* event) { - if (event && engine->transport->trace & PN_TRACE_EVT) { - pn_string_t *str = pn_string(NULL); - pn_inspect(event, str); - pn_transport_log(engine->transport, pn_string_get(str)); - pn_free(str); +bool pn_connection_engine_write_closed(pn_connection_engine_t *ce) { + return pn_transport_pending(ce->transport) < 0; +} + +void pn_connection_engine_write_close(pn_connection_engine_t *ce) { + if (!pn_connection_engine_write_closed(ce)) { + pn_transport_close_head(ce->transport); + } +} + +void pn_connection_engine_close(pn_connection_engine_t *ce) { + pn_connection_engine_read_close(ce); + pn_connection_engine_write_close(ce); +} + +pn_event_t* pn_connection_engine_event(pn_connection_engine_t *ce) { + pn_event_t *e = ce->collector ? pn_collector_peek(ce->collector) : NULL; + if (e) { + pn_transport_t *t = ce->transport; + if (t && t->trace & PN_TRACE_EVT) { + /* This can log the same event twice if pn_connection_engine_event is called + * twice but for debugging it is much better to log before handling than after. + */ + pn_string_clear(t->scratch); + pn_inspect(e, t->scratch); + pn_transport_log(t, pn_string_get(t->scratch)); } + } + return e; +} + +bool pn_connection_engine_has_event(pn_connection_engine_t *ce) { + return ce->collector && pn_collector_peek(ce->collector); } -pn_event_t* pn_connection_engine_dispatch(pn_connection_engine_t* e) { - if (e->event) { /* Already returned */ - if (pn_event_type(e->event) == PN_CONNECTION_INIT) - pn_transport_bind(e->transport, e->connection); - pn_collector_pop(e->collector); +void pn_connection_engine_pop_event(pn_connection_engine_t *ce) { + if (ce->collector) { + pn_event_t *e = pn_collector_peek(ce->collector); + if (pn_event_type(e) == PN_TRANSPORT_CLOSED) { /* The last event ever */ + /* Events can accumulate behind the TRANSPORT_CLOSED before the + * PN_TRANSPORT_CLOSED event is handled. They can never be processed + * so release them. + */ + pn_collector_release(ce->collector); + } else { + pn_collector_pop(ce->collector); } - e->event = pn_collector_peek(e->collector); - log_event(e, e->event); - return e->event; + + } +} + +bool pn_connection_engine_finished(pn_connection_engine_t *ce) { + return pn_transport_closed(ce->transport) && !pn_connection_engine_has_event(ce); +} + +void pn_connection_engine_verrorf(pn_connection_engine_t *ce, const char *name, const char *fmt, va_list ap) { + pn_transport_t *t = ce->transport; + pn_condition_t *cond = pn_transport_condition(t); + pn_string_vformat(t->scratch, fmt, ap); + pn_condition_set_name(cond, name); + pn_condition_set_description(cond, pn_string_get(t->scratch)); } -bool pn_connection_engine_finished(pn_connection_engine_t* e) { - return pn_transport_closed(e->transport) && (pn_collector_peek(e->collector) == NULL); +void pn_connection_engine_errorf(pn_connection_engine_t *ce, const char *name, const char *fmt, ...) { + va_list ap; + va_start(ap, fmt); + pn_connection_engine_verrorf(ce, name, fmt, ap); + va_end(ap); } -pn_connection_t* pn_connection_engine_connection(pn_connection_engine_t* e) { - return e->connection; +void pn_connection_engine_log(pn_connection_engine_t *ce, const char *msg) { + pn_transport_log(ce->transport, msg); } -pn_transport_t* pn_connection_engine_transport(pn_connection_engine_t* e) { - return e->transport; +void pn_connection_engine_vlogf(pn_connection_engine_t *ce, const char *fmt, va_list ap) { + pn_transport_vlogf(ce->transport, fmt, ap); } -pn_condition_t* pn_connection_engine_condition(pn_connection_engine_t* e) { - return pn_transport_condition(e->transport); +void pn_connection_engine_vlog(pn_connection_engine_t *ce, const char *msg) { + pn_transport_log(ce->transport, msg); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/src/engine/engine-internal.h ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine-internal.h b/proton-c/src/engine/engine-internal.h index 761a840..df2aec2 100644 --- a/proton-c/src/engine/engine-internal.h +++ b/proton-c/src/engine/engine-internal.h @@ -22,13 +22,14 @@ * */ -#include <proton/object.h> -#include <proton/engine.h> -#include <proton/types.h> #include "buffer.h" #include "dispatcher/dispatcher.h" #include "util.h" +#include <proton/object.h> +#include <proton/engine.h> +#include <proton/types.h> + typedef enum pn_endpoint_type_t {CONNECTION, SESSION, SENDER, RECEIVER} pn_endpoint_type_t; typedef struct pn_endpoint_t pn_endpoint_t; @@ -218,7 +219,6 @@ struct pn_transport_t { bool auth_required; bool authenticated; bool encryption_required; - bool referenced; }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/src/engine/engine.c ---------------------------------------------------------------------- diff --git a/proton-c/src/engine/engine.c b/proton-c/src/engine/engine.c index cb1f479..a5774be 100644 --- a/proton-c/src/engine/engine.c +++ b/proton-c/src/engine/engine.c @@ -32,6 +32,8 @@ #include "platform_fmt.h" #include "transport/transport.h" +#include <proton/extra.h> + static void pni_session_bound(pn_session_t *ssn); static void pni_link_bound(pn_link_t *link); @@ -208,6 +210,12 @@ void pn_condition_init(pn_condition_t *condition) condition->info = pn_data(0); } +pn_condition_t *pn_condition() { + pn_condition_t *c = (pn_condition_t*)malloc(sizeof(pn_condition_t)); + pn_condition_init(c); + return c; +} + void pn_condition_tini(pn_condition_t *condition) { pn_data_free(condition->info); @@ -215,6 +223,14 @@ void pn_condition_tini(pn_condition_t *condition) pn_free(condition->name); } +void pn_condition_free(pn_condition_t *c) { + if (c) { + pn_condition_clear(c); + pn_condition_tini(c); + free(c); + } +} + static void pni_add_session(pn_connection_t *conn, pn_session_t *ssn) { pn_list_add(conn->sessions, ssn); @@ -495,10 +511,15 @@ static void pn_connection_finalize(void *object) #define pn_connection_compare NULL #define pn_connection_inspect NULL -pn_connection_t *pn_connection(void) +PN_EXTRA_DECLARE(pn_connection_t); + +pn_rwbytes_t pn_connection_get_extra(pn_connection_t *c) { return PN_EXTRA_GET(pn_connection_t, c); } + +pn_connection_t *pn_connection_with_extra(size_t extra) { static const pn_class_t clazz = PN_CLASS(pn_connection); - pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, sizeof(pn_connection_t)); + size_t size = PN_EXTRA_SIZEOF(pn_connection_t, extra); + pn_connection_t *conn = (pn_connection_t *) pn_class_new(&clazz, size); if (!conn) return NULL; conn->endpoint_head = NULL; @@ -527,6 +548,10 @@ pn_connection_t *pn_connection(void) return conn; } +pn_connection_t *pn_connection(void) { + return pn_connection_with_extra(0); +} + static const pn_event_type_t endpoint_init_event_map[] = { PN_CONNECTION_INIT, /* CONNECTION */ PN_SESSION_INIT, /* SESSION */ @@ -545,6 +570,10 @@ void pn_connection_collect(pn_connection_t *connection, pn_collector_t *collecto } } +pn_collector_t* pn_connection_collector(pn_connection_t *connection) { + return connection->collector; +} + pn_state_t pn_connection_state(pn_connection_t *connection) { return connection ? connection->endpoint.state : 0; @@ -2229,3 +2258,15 @@ pn_transport_t *pn_event_transport(pn_event_t *event) } } } + +int pn_condition_copy(pn_condition_t *dest, pn_condition_t *src) { + assert(dest); + assert(src); + int err = 0; + if (src != dest) { + int err = pn_string_copy(dest->name, src->name); + if (!err) err = pn_string_copy(dest->description, src->description); + if (!err) err = pn_data_copy(dest->info, src->info); + } + return err; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/src/events/event.c ---------------------------------------------------------------------- diff --git a/proton-c/src/events/event.c b/proton-c/src/events/event.c index 5ad718e..2d09eda 100644 --- a/proton-c/src/events/event.c +++ b/proton-c/src/events/event.c @@ -382,7 +382,18 @@ const char *pn_event_type_name(pn_event_type_t type) return "PN_SELECTABLE_EXPIRED"; case PN_SELECTABLE_FINAL: return "PN_SELECTABLE_FINAL"; + case PN_CONNECTION_WAKE: + return "PN_CONNECTION_WAKE"; + case PN_LISTENER_CLOSE: + return "PN_LISTENER_CLOSE"; + case PN_PROACTOR_INTERRUPT: + return "PN_PROACTOR_INTERRUPT"; + case PN_PROACTOR_TIMEOUT: + return "PN_PROACTOR_TIMEOUT"; + case PN_PROACTOR_INACTIVE: + return "PN_PROACTOR_INACTIVE"; + default: + return "PN_UNKNOWN"; } - return NULL; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/proton-c/src/object/object.c ---------------------------------------------------------------------- diff --git a/proton-c/src/object/object.c b/proton-c/src/object/object.c index b0c1b33..a6952b6 100644 --- a/proton-c/src/object/object.c +++ b/proton-c/src/object/object.c @@ -32,10 +32,10 @@ intptr_t pn_object_compare(void *a, void *b) { return (intptr_t) a - (intptr_t) const pn_class_t PN_OBJECT[] = {PN_CLASS(pn_object)}; #define pn_void_initialize NULL -static void *pn_void_new(const pn_class_t *clazz, size_t size) { return malloc(size); } -static void pn_void_incref(void *object) {} -static void pn_void_decref(void *object) {} -static int pn_void_refcount(void *object) { return -1; } +void *pn_void_new(const pn_class_t *clazz, size_t size) { return malloc(size); } +void pn_void_incref(void* p) {} +void pn_void_decref(void* p) {} +int pn_void_refcount(void *object) { return -1; } #define pn_void_finalize NULL static void pn_void_free(void *object) { free(object); } static const pn_class_t *pn_void_reify(void *object) { return PN_VOID; } @@ -199,7 +199,7 @@ typedef struct { void *pn_object_new(const pn_class_t *clazz, size_t size) { void *object = NULL; - pni_head_t *head = (pni_head_t *) malloc(sizeof(pni_head_t) + size); + pni_head_t *head = (pni_head_t *) calloc(1, sizeof(pni_head_t) + size); if (head != NULL) { object = head + 1; head->clazz = clazz; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/51d291b8/tools/cmake/Modules/FindLibuv.cmake ---------------------------------------------------------------------- diff --git a/tools/cmake/Modules/FindLibuv.cmake b/tools/cmake/Modules/FindLibuv.cmake new file mode 100644 index 0000000..ae3ab70 --- /dev/null +++ b/tools/cmake/Modules/FindLibuv.cmake @@ -0,0 +1,37 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Find libuv include dirs and libraries. +# +# Sets the following variables: +# +# Libuv_FOUND - True if headers and requested libraries were found +# Libuv_INCLUDE_DIRS - Libuv include directories +# Libuv_LIBRARIES - Link these to use libuv. +# +# This module reads hints about search locations from variables:: +# LIBUV_ROOT - Preferred installation prefix +# LIBUV_INCLUDEDIR - Preferred include directory e.g. <prefix>/include +# LIBUV_LIBRARYDIR - Preferred library directory e.g. <prefix>/lib + +find_library(Libuv_LIBRARIES Names uv libuv HINTS ${LIBUV_LIBRARYDIR} ${LIBUV_ROOT}) +find_path(Libuv_INCLUDE_DIRS NAMES uv.h HINTS ${LIBUV_INCLUDEDIR} ${LIBUV_ROOT} ${LIBUV_ROOT}/include ${CMAKE_INSTALL_PREFIX}/include PATHS /usr/include) + +include(FindPackageHandleStandardArgs) +find_package_handle_standard_args(Libuv DEFAULT_MSG Libuv_LIBRARIES Libuv_INCLUDE_DIRS) --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
