PROTON-1064: [ruby] Wrap pn_connection_driver_t as ConnectionDriver
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9bb1baad Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9bb1baad Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9bb1baad Branch: refs/heads/master Commit: 9bb1baad263e094427e863e3455b8bc7dbd1f949 Parents: b3d1b07 Author: Alan Conway <acon...@redhat.com> Authored: Mon Sep 18 22:31:48 2017 -0400 Committer: Alan Conway <acon...@redhat.com> Committed: Tue Nov 7 13:31:51 2017 -0500 ---------------------------------------------------------------------- proton-c/bindings/ruby/CMakeLists.txt | 6 +- proton-c/bindings/ruby/cproton.i | 677 +++++++++++++++++++ proton-c/bindings/ruby/lib/core/connection.rb | 2 +- .../bindings/ruby/lib/core/connection_driver.rb | 182 +++++ proton-c/bindings/ruby/lib/core/endpoint.rb | 14 +- proton-c/bindings/ruby/lib/qpid_proton.rb | 1 + proton-c/bindings/ruby/ruby.i | 640 ------------------ .../ruby/tests/test_connection_driver.rb | 70 ++ proton-c/include/proton/cproton.i | 13 +- 9 files changed, 956 insertions(+), 649 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/bindings/ruby/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/CMakeLists.txt b/proton-c/bindings/ruby/CMakeLists.txt index 93364e2..3397b43 100644 --- a/proton-c/bindings/ruby/CMakeLists.txt +++ b/proton-c/bindings/ruby/CMakeLists.txt @@ -27,7 +27,7 @@ list(APPEND SWIG_MODULE_cproton-ruby_EXTRA_DEPS ${PROTON_HEADERS} ) include_directories (${RUBY_INCLUDE_PATH}) -swig_add_library(cproton-ruby LANGUAGE ruby SOURCES ruby.i) +swig_add_library(cproton-ruby LANGUAGE ruby SOURCES cproton.i) swig_link_libraries(cproton-ruby ${BINDING_DEPS} ${RUBY_LIBRARY}) # set a compiler macro to relay the Ruby version to the extension. @@ -55,11 +55,11 @@ if (GEM_EXE) add_custom_command( OUTPUT ${bin}/qpid_proton-${PN_VERSION}.gem COMMAND ${CMAKE_COMMAND} -E copy_directory ${src} ${bin}/gem - COMMAND ${CMAKE_COMMAND} -E copy ${bin}/rubyRUBY_wrap.c ${bin}/gem/ext/cproton/cproton.c + COMMAND ${CMAKE_COMMAND} -E copy ${bin}/cprotonRUBY_wrap.c ${bin}/gem/ext/cproton/cproton.c COMMAND ${GEM_EXE} build qpid_proton.gemspec COMMAND ${CMAKE_COMMAND} -E copy ${bin}/gem/qpid_proton-${PN_VERSION}.gem ${bin} WORKING_DIRECTORY ${bin}/gem - DEPENDS ${RUBY_SRC} ${src}/LICENSE ${src}/TODO ${src}/ChangeLog cproton-ruby ${bin}/rubyRUBY_wrap.c + DEPENDS ${RUBY_SRC} ${src}/LICENSE ${src}/TODO ${src}/ChangeLog cproton-ruby ${bin}/cprotonRUBY_wrap.c ) add_custom_target(ruby-gem ALL DEPENDS ${bin}/qpid_proton-${PN_VERSION}.gem ) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/bindings/ruby/cproton.i ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/cproton.i b/proton-c/bindings/ruby/cproton.i new file mode 100644 index 0000000..107f5d8 --- /dev/null +++ b/proton-c/bindings/ruby/cproton.i @@ -0,0 +1,677 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +%module cproton + +%{ +#include <proton/connection_driver.h> +#include <proton/engine.h> +#include <proton/handlers.h> +#include <proton/message.h> +#include <proton/messenger.h> +#include <proton/reactor.h> +#include <proton/sasl.h> +#include <proton/ssl.h> +#include <proton/types.h> +#include <proton/url.h> +%} + +/* +NOTE: According to ccache-swig man page: "Known problems are using +preprocessor directives within %inline blocks and the use of â#pragma SWIGâ." +This includes using macros in an %inline section. + +Keep preprocessor directives and macro expansions in the normal header section. +*/ + +%include <cstring.i> + +%cstring_output_withsize(char *OUTPUT, size_t *OUTPUT_SIZE) +%cstring_output_allocate_size(char **ALLOC_OUTPUT, size_t *ALLOC_SIZE, free(*$1)); +%cstring_output_maxsize(char *OUTPUT, size_t MAX_OUTPUT_SIZE) + +%{ +#if !defined(RSTRING_LEN) +# define RSTRING_LEN(x) (RSTRING(x)->len) +# define RSTRING_PTR(x) (RSTRING(x)->ptr) +#endif +%} + +%typemap(in) pn_bytes_t { + if ($input == Qnil) { + $1.start = NULL; + $1.size = 0; + } else { + $1.start = RSTRING_PTR($input); + if (!$1.start) { + $1.size = 0; + } + $1.size = RSTRING_LEN($input); + } +} + +%typemap(out) pn_bytes_t { + $result = rb_str_new($1.start, $1.size); +} + +%typemap(in) pn_atom_t +{ + if ($input == Qnil) + { + $1.type = PN_NULL; + } + else + { + switch(TYPE($input)) + { + case T_TRUE: + $1.type = PN_BOOL; + $1.u.as_bool = true; + break; + + case T_FALSE: + $1.type = PN_BOOL; + $1.u.as_bool = false; + break; + + case T_FLOAT: + $1.type = PN_FLOAT; + $1.u.as_float = NUM2DBL($input); + break; + + case T_STRING: + $1.type = PN_STRING; + $1.u.as_bytes.start = RSTRING_PTR($input); + if ($1.u.as_bytes.start) + { + $1.u.as_bytes.size = RSTRING_LEN($input); + } + else + { + $1.u.as_bytes.size = 0; + } + break; + + case T_FIXNUM: + $1.type = PN_INT; + $1.u.as_int = FIX2LONG($input); + break; + + case T_BIGNUM: + $1.type = PN_LONG; + $1.u.as_long = NUM2LL($input); + break; + + } + } +} + +%typemap(out) pn_atom_t +{ + switch($1.type) + { + case PN_NULL: + $result = Qnil; + break; + + case PN_BOOL: + $result = $1.u.as_bool ? Qtrue : Qfalse; + break; + + case PN_BYTE: + $result = INT2NUM($1.u.as_byte); + break; + + case PN_UBYTE: + $result = UINT2NUM($1.u.as_ubyte); + break; + + case PN_SHORT: + $result = INT2NUM($1.u.as_short); + break; + + case PN_USHORT: + $result = UINT2NUM($1.u.as_ushort); + break; + + case PN_INT: + $result = INT2NUM($1.u.as_int); + break; + + case PN_UINT: + $result = UINT2NUM($1.u.as_uint); + break; + + case PN_LONG: + $result = LL2NUM($1.u.as_long); + break; + + case PN_ULONG: + $result = ULL2NUM($1.u.as_ulong); + break; + + case PN_FLOAT: + $result = rb_float_new($1.u.as_float); + break; + + case PN_DOUBLE: + $result = rb_float_new($1.u.as_double); + break; + + case PN_STRING: + $result = rb_str_new($1.u.as_bytes.start, $1.u.as_bytes.size); + break; + + default: + break; + } +} + +%typemap (in) pn_decimal32_t +{ + $1 = FIX2UINT($input); +} + +%typemap (out) pn_decimal32_t +{ + $result = ULL2NUM($1); +} + +%typemap (in) pn_decimal64_t +{ + $1 = NUM2ULL($input); +} + +%typemap (out) pn_decimal64_t +{ + $result = ULL2NUM($1); +} + +%typemap (in) pn_decimal128_t +{ + int index; + + for(index = 0; index < 16; index++) + { + VALUE element = rb_ary_entry($input, index); + $1.bytes[16 - (index + 1)] = FIX2INT(element); + } +} + +%typemap (out) pn_decimal128_t +{ + int index; + + $result = rb_ary_new2(16); + for(index = 0; index < 16; index++) + { + rb_ary_store($result, 16 - (index + 1), CHR2FIX($1.bytes[index])); + } +} + +%typemap (in) pn_uuid_t +{ + int index; + + for(index = 0; index < 16; index++) + { + VALUE element = rb_ary_entry($input, index); + $1.bytes[16 - (index + 1)] = FIX2INT(element); + } +} + +%typemap (out) pn_uuid_t +{ + int index; + + $result = rb_ary_new2(16); + for(index = 0; index < 16; index++) + { + rb_ary_store($result, 16 - (index + 1), CHR2FIX($1.bytes[index])); + } +} + +int pn_message_encode(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE); +%ignore pn_message_encode; + +ssize_t pn_link_send(pn_link_t *transport, char *STRING, size_t LENGTH); +%ignore pn_link_send; + +%rename(pn_link_recv) wrap_pn_link_recv; +%inline %{ + int wrap_pn_link_recv(pn_link_t *link, char *OUTPUT, size_t *OUTPUT_SIZE) { + ssize_t sz = pn_link_recv(link, OUTPUT, *OUTPUT_SIZE); + if (sz >= 0) { + *OUTPUT_SIZE = sz; + } else { + *OUTPUT_SIZE = 0; + } + return sz; + } +%} +%ignore pn_link_recv; + +ssize_t pn_transport_input(pn_transport_t *transport, char *STRING, size_t LENGTH); +%ignore pn_transport_input; + +%rename(pn_transport_output) wrap_pn_transport_output; +%inline %{ + int wrap_pn_transport_output(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE) { + ssize_t sz = pn_transport_output(transport, OUTPUT, *OUTPUT_SIZE); + if (sz >= 0) { + *OUTPUT_SIZE = sz; + } else { + *OUTPUT_SIZE = 0; + } + return sz; + } +%} +%ignore pn_transport_output; + +%rename(pn_transport_peek) wrap_pn_transport_peek; +%inline %{ + int wrap_pn_transport_peek(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE) { + ssize_t sz = pn_transport_peek(transport, OUTPUT, *OUTPUT_SIZE); + if(sz >= 0) { + *OUTPUT_SIZE = sz; + } else { + *OUTPUT_SIZE = 0; + } + return sz; + } +%} +%ignore pn_transport_peek; + +%rename(pn_delivery) wrap_pn_delivery; +%inline %{ + pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) { + return pn_delivery(link, pn_dtag(STRING, LENGTH)); + } +%} +%ignore pn_delivery; + +// Suppress "Warning(451): Setting a const char * variable may leak memory." on pn_delivery_tag_t +%warnfilter(451) pn_delivery_tag_t; +%rename(pn_delivery_tag) wrap_pn_delivery_tag; +%inline %{ + void wrap_pn_delivery_tag(pn_delivery_t *delivery, char **ALLOC_OUTPUT, size_t *ALLOC_SIZE) { + pn_delivery_tag_t tag = pn_delivery_tag(delivery); + *ALLOC_OUTPUT = malloc(tag.size); + *ALLOC_SIZE = tag.size; + memcpy(*ALLOC_OUTPUT, tag.start, tag.size); + } +%} +%ignore pn_delivery_tag; + +bool pn_ssl_get_cipher_name(pn_ssl_t *ssl, char *OUTPUT, size_t MAX_OUTPUT_SIZE); +%ignore pn_ssl_get_cipher_name; + +bool pn_ssl_get_protocol_name(pn_ssl_t *ssl, char *OUTPUT, size_t MAX_OUTPUT_SIZE); +%ignore pn_ssl_get_protocol_name; + +%inline %{ +#if defined(RUBY20) || defined(RUBY21) + + typedef void *non_blocking_return_t; +#define RB_BLOCKING_CALL rb_thread_call_without_gvl + +#elif defined(RUBY19) + + typedef VALUE non_blocking_return_t; +#define RB_BLOCKING_CALL rb_thread_blocking_region + +#endif + %} + +%rename(pn_messenger_send) wrap_pn_messenger_send; +%rename(pn_messenger_recv) wrap_pn_messenger_recv; +%rename(pn_messenger_work) wrap_pn_messenger_work; + +%inline %{ + +#if defined(RB_BLOCKING_CALL) + + static non_blocking_return_t pn_messenger_send_no_gvl(void *args) { + VALUE result = Qnil; + pn_messenger_t *messenger = (pn_messenger_t *)((void **)args)[0]; + int *limit = (int *)((void **)args)[1]; + + int rc = pn_messenger_send(messenger, *limit); + + result = INT2NUM(rc); + return (non_blocking_return_t )result; + } + + static non_blocking_return_t pn_messenger_recv_no_gvl(void *args) { + VALUE result = Qnil; + pn_messenger_t *messenger = (pn_messenger_t *)((void **)args)[0]; + int *limit = (int *)((void **)args)[1]; + + int rc = pn_messenger_recv(messenger, *limit); + + result = INT2NUM(rc); + return (non_blocking_return_t )result; + } + + static non_blocking_return_t pn_messenger_work_no_gvl(void *args) { + VALUE result = Qnil; + pn_messenger_t *messenger = (pn_messenger_t *)((void **)args)[0]; + int *timeout = (int *)((void **)args)[1]; + + int rc = pn_messenger_work(messenger, *timeout); + + result = INT2NUM(rc); + return (non_blocking_return_t )result; + } + +#endif + + int wrap_pn_messenger_send(pn_messenger_t *messenger, int limit) { + int result = 0; + +#if defined(RB_BLOCKING_CALL) + + // only release the gil if we're blocking + if(pn_messenger_is_blocking(messenger)) { + VALUE rc; + void* args[2]; + + args[0] = messenger; + args[1] = &limit; + + rc = RB_BLOCKING_CALL(pn_messenger_send_no_gvl, + &args, RUBY_UBF_PROCESS, NULL); + + if(RTEST(rc)) + { + result = FIX2INT(rc); + } + } + +#else // !defined(RB_BLOCKING_CALL) + result = pn_messenger_send(messenger, limit); +#endif // defined(RB_BLOCKING_CALL) + + return result; + } + + int wrap_pn_messenger_recv(pn_messenger_t *messenger, int limit) { + int result = 0; + +#if defined(RB_BLOCKING_CALL) + // only release the gil if we're blocking + if(pn_messenger_is_blocking(messenger)) { + VALUE rc; + void* args[2]; + + args[0] = messenger; + args[1] = &limit; + + rc = RB_BLOCKING_CALL(pn_messenger_recv_no_gvl, + &args, RUBY_UBF_PROCESS, NULL); + + if(RTEST(rc)) + { + result = FIX2INT(rc); + } + + } else { + result = pn_messenger_recv(messenger, limit); + } +#else // !defined(RB_BLOCKING_CALL) + result = pn_messenger_recv(messenger, limit); +#endif // defined(RB_BLOCKING_CALL) + + return result; + } + + int wrap_pn_messenger_work(pn_messenger_t *messenger, int timeout) { + int result = 0; + +#if defined(RB_BLOCKING_CALL) + // only release the gil if we're blocking + if(timeout) { + VALUE rc; + void* args[2]; + + args[0] = messenger; + args[1] = &timeout; + + rc = RB_BLOCKING_CALL(pn_messenger_work_no_gvl, + &args, RUBY_UBF_PROCESS, NULL); + + if(RTEST(rc)) + { + result = FIX2INT(rc); + } + } else { + result = pn_messenger_work(messenger, timeout); + } +#else + result = pn_messenger_work(messenger, timeout); +#endif + + return result; + } + +%} + +%ignore pn_messenger_send; +%ignore pn_messenger_recv; +%ignore pn_messenger_work; + +%{ +typedef struct Pn_rbkey_t { + void *registry; + char *method; + char *key_value; +} Pn_rbkey_t; + +void Pn_rbkey_initialize(void *vp_rbkey) { + Pn_rbkey_t *rbkey = (Pn_rbkey_t*)vp_rbkey; + assert(rbkey); + rbkey->registry = NULL; + rbkey->method = NULL; + rbkey->key_value = NULL; +} + +void Pn_rbkey_finalize(void *vp_rbkey) { + Pn_rbkey_t *rbkey = (Pn_rbkey_t*)vp_rbkey; + if(rbkey && rbkey->registry && rbkey->method && rbkey->key_value) { + rb_funcall((VALUE )rbkey->registry, rb_intern(rbkey->method), 1, rb_str_new2(rbkey->key_value)); + } + if(rbkey->key_value) { + free(rbkey->key_value); + rbkey->key_value = NULL; + } +} + +/* NOTE: no macro or preprocessor definitions in %inline sections */ +#define CID_Pn_rbkey CID_pn_void +#define Pn_rbkey_inspect NULL +#define Pn_rbkey_compare NULL +#define Pn_rbkey_hashcode NULL + +pn_class_t* Pn_rbkey__class(void) { + static pn_class_t clazz = PN_CLASS(Pn_rbkey); + return &clazz; +} + +Pn_rbkey_t *Pn_rbkey_new(void) { + return (Pn_rbkey_t *) pn_class_new(Pn_rbkey__class(), sizeof(Pn_rbkey_t)); +} +%} + +pn_class_t* Pn_rbkey__class(void); +Pn_rbkey_t *Pn_rbkey_new(void); + +%inline %{ + +Pn_rbkey_t *Pn_rbkey_new(void); + +void Pn_rbkey_set_registry(Pn_rbkey_t *rbkey, void *registry) { + assert(rbkey); + rbkey->registry = registry; +} + +void *Pn_rbkey_get_registry(Pn_rbkey_t *rbkey) { + assert(rbkey); + return rbkey->registry; +} + +void Pn_rbkey_set_method(Pn_rbkey_t *rbkey, char *method) { + assert(rbkey); + rbkey->method = method; +} + +char *Pn_rbkey_get_method(Pn_rbkey_t *rbkey) { + assert(rbkey); + return rbkey->method; +} + +void Pn_rbkey_set_key_value(Pn_rbkey_t *rbkey, char *key_value) { + assert(rbkey); + rbkey->key_value = malloc(strlen(key_value) + 1); + strncpy(rbkey->key_value, key_value, strlen(key_value) + 1); +} + +char *Pn_rbkey_get_key_value(Pn_rbkey_t *rbkey) { + assert(rbkey); + return rbkey->key_value; +} + +Pn_rbkey_t *pni_void2rbkey(void *object) { + return (Pn_rbkey_t *)object; +} + +VALUE pn_void2rb(void *object) { + return (VALUE )object; +} + +void *pn_rb2void(VALUE object) { + return (void *)object; +} + +VALUE pni_address_of(void *object) { + return ULL2NUM((unsigned long )object); +} + +%} + +//%rename(pn_collector_put) wrap_pn_collector_put; +//%inline %{ +// pn_event_t *wrap_pn_collector_put(pn_collector_t *collector, void *context, +// pn_event_type_t type) { +// return pn_collector_put(collector, PN_RBREF, context, type); +// } +// %} +//%ignore pn_collector_put; + +int pn_ssl_get_peer_hostname(pn_ssl_t *ssl, char *OUTPUT, size_t *OUTPUT_SIZE); +%ignore pn_ssl_get_peer_hostname; + +%inline %{ + + VALUE pni_ruby_get_proton_module() { + VALUE mQpid = rb_define_module("Qpid"); + return rb_define_module_under(mQpid, "Proton"); + } + + void pni_ruby_add_to_registry(VALUE key, VALUE value) { + VALUE result = rb_funcall(pni_ruby_get_proton_module(), rb_intern("add_to_registry"), 2, key, value); + } + + VALUE pni_ruby_get_from_registry(VALUE key) { + return rb_funcall(pni_ruby_get_proton_module(), rb_intern("get_from_registry"), 1, key); + } + + void pni_ruby_delete_from_registry(VALUE stored_key) { + rb_funcall(pni_ruby_get_proton_module(), rb_intern("delete_from_registry"), 1, stored_key); + } + + typedef struct { + VALUE handler_key; + } Pni_rbhandler_t; + + static Pni_rbhandler_t *pni_rbhandler(pn_handler_t *handler) { + return (Pni_rbhandler_t *) pn_handler_mem(handler); + } + + static void pni_rbdispatch(pn_handler_t *handler, pn_event_t *event, pn_event_type_t type) { + Pni_rbhandler_t *rbh = pni_rbhandler(handler); + VALUE rbhandler = pni_ruby_get_from_registry(rbh->handler_key); + + rb_funcall(rbhandler, rb_intern("dispatch"), 2, SWIG_NewPointerObj(event, SWIGTYPE_p_pn_event_t, 0), INT2FIX(type)); + } + + static void pni_rbhandler_finalize(pn_handler_t *handler) { + Pni_rbhandler_t *rbh = pni_rbhandler(handler); + pni_ruby_delete_from_registry(rbh->handler_key); + } + + pn_handler_t *pn_rbhandler(VALUE handler) { + pn_handler_t *chandler = pn_handler_new(pni_rbdispatch, sizeof(Pni_rbhandler_t), pni_rbhandler_finalize); + Pni_rbhandler_t *rhy = pni_rbhandler(chandler); + + VALUE ruby_key = rb_class_new_instance(0, NULL, rb_cObject); + pni_ruby_add_to_registry(ruby_key, handler); + + rhy->handler_key = ruby_key; + + return chandler; + } + + + /* Helpers for working with pn_connection_driver_t */ + + size_t pni_connection_driver_read_size(pn_connection_driver_t* d) { + return pn_connection_driver_read_buffer(d).size; + } + + size_t pni_connection_driver_write_size(pn_connection_driver_t* d) { + return pn_connection_driver_write_buffer(d).size; + } + + pn_connection_t *pni_connection_driver_connection(pn_connection_driver_t* d) { + return d->connection; + } + + pn_transport_t *pni_connection_driver_transport(pn_connection_driver_t* d) { + return d->transport; + } + + size_t pni_connection_driver_read_copy(pn_connection_driver_t* d, char *STRING, size_t LENGTH ) { + pn_rwbytes_t rbuf = pn_connection_driver_read_buffer(d); + size_t n = LENGTH < rbuf.size ? LENGTH : rbuf.size; + memcpy(rbuf.start, STRING, n); + pn_connection_driver_read_done(d, n); + return n; + } + + pn_connection_driver_t *pni_connection_driver() { + pn_connection_driver_t *d = (pn_connection_driver_t*)malloc(sizeof(*d)); + if (pn_connection_driver_init(d, NULL, NULL) != 0) { + free(d); + return NULL; + } + return d; + } + +%} + +%include "proton/cproton.i" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/bindings/ruby/lib/core/connection.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection.rb b/proton-c/bindings/ruby/lib/core/connection.rb index ef785b2..6caf589 100644 --- a/proton-c/bindings/ruby/lib/core/connection.rb +++ b/proton-c/bindings/ruby/lib/core/connection.rb @@ -196,7 +196,7 @@ module Qpid::Proton # Open the local end of the connection. # # @option options [String] :container_id Unique AMQP container ID, defaults to a UUID - # @option [String] :link_prefix Prefix for generated link names, default is container_id + # @option options [String] :link_prefix Prefix for generated link names, default is container_id # def open(options={}) object_to_data(@offered_capabilities, Cproton.pn_connection_offered_capabilities(@impl)) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/bindings/ruby/lib/core/connection_driver.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/connection_driver.rb b/proton-c/bindings/ruby/lib/core/connection_driver.rb new file mode 100644 index 0000000..b5b38ac --- /dev/null +++ b/proton-c/bindings/ruby/lib/core/connection_driver.rb @@ -0,0 +1,182 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +require 'socket' + +module Qpid + module Proton + + # Associate an AMQP {Connection} with an {IO} and a {MessagingHandler} + # + # - Read AMQP binary data from the {IO} (#read, #process) + # - Call on_* methods on the {MessagingHandler} for AMQP events (#dispatch, #process) + # - Write AMQP binary data to the {IO} (#write, #process) + # + # Thread safety: The {ConnectionDriver} is not thread safe but separate + # {ConnectionDriver} instances can be processed concurrently. The + # {Container} handles multiple connections concurrently in multiple threads. + # + class ConnectionDriver + + # Create a {Connection} and associate it with +io+ and +handler+ + # + # @param io [#read_nonblock, #write_nonblock] An {IO} or {IO}-like object that responds + # to #read_nonblock and #write_nonblock. + # @param handler [MessagingHandler] The handler to be invoked for AMQP events + # + def initialize io, handler=nil + @impl = Cproton.pni_connection_driver or raise RuntimeError, "cannot create connection driver" + @io = io + @handler = handler || Handler::MessagingHandler.new # Default handler for default behaviour + @rbuf = "" # String to re-use as read buffer + end + + # @return [MessagingHandler] + attr_reader :handler + + # @return [Connection] + def connection() Connection.wrap(Cproton.pni_connection_driver_connection(@impl)); end + + # @return [Transport] + def transport() Transport.wrap(Cproton.pni_connection_driver_transport(@impl)); end + + # @return [IO] Allows ConnectionDriver to be passed directly to {IO#select} + def to_io() @io; end + + # @return [Bool] True if the driver can read more data + def can_read?() Cproton.pni_connection_driver_read_size(@impl) > 0; end + + # @return [Bool] True if the driver has data to write + def can_write?() Cproton.pni_connection_driver_write_size(@impl) > 0; end + + # True if read and write sides of the IO are closed. Note this does not imply + # {#finished?} since there may still be events to dispatch. + def closed? + Cproton.pn_connection_driver_read_closed(@impl) && + Cproton.pn_connection_driver_read_closed(@impl) + end + + # True if the ConnectionDriver has nothing left to do: {#closed?} and + # there are no more events to dispatch. + def finished?() Cproton.pn_connection_driver_finished(@impl); end + + # Dispatch available events, call the relevant on_* methods on the {#handler}. + def dispatch(extra_handlers = nil) + extra_handlers ||= [] + while event = Event::Event.wrap(Cproton.pn_connection_driver_next_event(@impl)) + event.dispatch(@handler) + extra_handlers.each { |h| event.dispatch h } + end + end + + # Read from IO without blocking. + # IO errors are not raised, they are passed to {#handler}.on_transport_error by {#dispatch} + def read + size = Cproton.pni_connection_driver_read_size(@impl) + return if size <= 0 + @io.read_nonblock(size, @rbuf) # Use the same string rbuf for reading each time + Cproton.pni_connection_driver_read_copy(@impl, @rbuf) unless @rbuf.empty? + rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR + # Try again later. + rescue EOFError # EOF is not an error + Cproton.pn_connection_driver_read_close(@impl) + rescue IOError => e # IOError is passed to the transport + error "read: #{e}" + Cproton.pn_connection_driver_read_close(@impl) + end + + # Write to IO without blocking. + # IO errors are not raised, they are passed to {#handler}.on_transport_error by {#dispatch} + def write + n = @io.write_nonblock(Cproton.pn_connection_driver_write_buffer(@impl)) + Cproton.pn_connection_driver_write_done(@impl, n) if n > 0 + rescue Errno::EWOULDBLOCK, Errno::EAGAIN, Errno::EINTR + # Try again later. + rescue IOError => e + error "write: #{e}" + Cproton.pn_connection_driver_write_close(@impl) + end + + # Generate timed events and IO, for example idle-timeout and heart-beat events. + # May generate events for {#dispatch} and change the readable/writeable state. + # + # @param [Time] now the current time, defaults to {Time#now}. + # + # @return [Time] time of the next scheduled event, or nil if there are no + # scheduled events. If non-nil, tick() must be called again no later than + # this time. + def tick(now=Time.now) + transport = Cproton.pni_connection_driver_transport(@impl) + ms = Cproton.pn_transport_tick(transport, (now.to_r * 1000).to_i) + return ms.zero? ? nil : Time.at(ms.to_r / 1000); + end + + # Do read, tick, write and dispatch without blocking. + # @param [Bool] io_readable true if the IO might be readable + # @param [Bool] io_writable true if the IO might be writeable + # @param [Time] now the current time + # @return [Time] Latest time to call {#process} again for scheduled events, + # or nil if there are no scheduled events + def process(io_readable=true, io_writable=true, now=Time.now) + read if io_readable + next_tick = tick(now) + if io_writable + dispatch + write + end + dispatch + return next_tick + end + + # Close the read side of the IO with optional error. + # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch} + def close_read(e=nil) + @io.close_read + error(e) + Cproton.pn_connection_driver_read_close(@impl) + end + + # Close the write side of the IO with optional error + # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch} + def close_write(e=nil) + @io.close_write + error(e) + Cproton.pn_connection_driver_write_close(@impl) + end + + # Close both sides of the IO with optional error + # @param e [#to_s] Non-nil error will call {#handler}.on_transport_error on next {#dispatch} + def close(e=nil) + if !closed? + close_read(e) + close_write(e) + end + end + + def to_s + transport = Cproton.pni_connection_driver_tranport(@impl) + return "#<#{self.class.name}[#{transport}]:#{@io}>" + end + + private + + def error(e) + Cproton.pn_connection_driver_errorf(@impl, "proton:io", "%s", e.to_s) if e + end + end + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/bindings/ruby/lib/core/endpoint.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/core/endpoint.rb b/proton-c/bindings/ruby/lib/core/endpoint.rb index f3ddbcb..7c6f0a3 100644 --- a/proton-c/bindings/ruby/lib/core/endpoint.rb +++ b/proton-c/bindings/ruby/lib/core/endpoint.rb @@ -82,6 +82,12 @@ module Qpid::Proton self.connection.transport end + # @return [Bool] true if endpoint has sent and received a CLOSE frame + def closed?() check_state(LOCAL_CLOSED | REMOTE_CLOSED); end + + # @return [Bool] true if endpoint has sent and received an OPEN frame + def open?() check_state(LOCAL_ACTIVE | REMOTE_ACTIVE); end + def local_uninit? check_state(LOCAL_UNINIT) end @@ -106,10 +112,6 @@ module Qpid::Proton check_state(REMOTE_CLOSED) end - def check_state(state_mask) - !(self.state & state_mask).zero? - end - def handler reactor = Qpid::Proton::Reactor::Reactor.wrap(Cproton.pn_object_reactor(@impl)) if reactor.nil? @@ -135,6 +137,10 @@ module Qpid::Proton Cproton.pn_decref(impl) end + private + + def check_state(mask) (self.state & mask) == mask; end + end end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/bindings/ruby/lib/qpid_proton.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/lib/qpid_proton.rb b/proton-c/bindings/ruby/lib/qpid_proton.rb index 0180291..dae9b5f 100644 --- a/proton-c/bindings/ruby/lib/qpid_proton.rb +++ b/proton-c/bindings/ruby/lib/qpid_proton.rb @@ -78,6 +78,7 @@ require "core/ssl" require "core/transport" require "core/base_handler" require "core/url" +require "core/connection_driver" # Messenger API classes require "messenger/subscription" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/bindings/ruby/ruby.i ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/ruby.i b/proton-c/bindings/ruby/ruby.i deleted file mode 100644 index d5979f3..0000000 --- a/proton-c/bindings/ruby/ruby.i +++ /dev/null @@ -1,640 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -%module cproton - -%{ -#include <proton/engine.h> -#include <proton/message.h> -#include <proton/sasl.h> -#include <proton/messenger.h> -#include <proton/ssl.h> -#include <proton/types.h> -#include <proton/url.h> -#include <proton/reactor.h> -#include <proton/handlers.h> -%} - -/* -NOTE: According to ccache-swig man page: "Known problems are using -preprocessor directives within %inline blocks and the use of â#pragma SWIGâ." -This includes using macros in an %inline section. - -Keep preprocessor directives and macro expansions in the normal header section. -*/ - -%include <cstring.i> - -%cstring_output_withsize(char *OUTPUT, size_t *OUTPUT_SIZE) -%cstring_output_allocate_size(char **ALLOC_OUTPUT, size_t *ALLOC_SIZE, free(*$1)); -%cstring_output_maxsize(char *OUTPUT, size_t MAX_OUTPUT_SIZE) - -%{ -#if !defined(RSTRING_LEN) -# define RSTRING_LEN(x) (RSTRING(X)->len) -# define RSTRING_PTR(x) (RSTRING(x)->ptr) -#endif -%} - -%typemap(in) pn_bytes_t { - if ($input == Qnil) { - $1.start = NULL; - $1.size = 0; - } else { - $1.start = RSTRING_PTR($input); - if (!$1.start) { - $1.size = 0; - } - $1.size = RSTRING_LEN($input); - } -} - -%typemap(out) pn_bytes_t { - $result = rb_str_new($1.start, $1.size); -} - -%typemap(in) pn_atom_t -{ - if ($input == Qnil) - { - $1.type = PN_NULL; - } - else - { - switch(TYPE($input)) - { - case T_TRUE: - $1.type = PN_BOOL; - $1.u.as_bool = true; - break; - - case T_FALSE: - $1.type = PN_BOOL; - $1.u.as_bool = false; - break; - - case T_FLOAT: - $1.type = PN_FLOAT; - $1.u.as_float = NUM2DBL($input); - break; - - case T_STRING: - $1.type = PN_STRING; - $1.u.as_bytes.start = RSTRING_PTR($input); - if ($1.u.as_bytes.start) - { - $1.u.as_bytes.size = RSTRING_LEN($input); - } - else - { - $1.u.as_bytes.size = 0; - } - break; - - case T_FIXNUM: - $1.type = PN_INT; - $1.u.as_int = FIX2LONG($input); - break; - - case T_BIGNUM: - $1.type = PN_LONG; - $1.u.as_long = NUM2LL($input); - break; - - } - } -} - -%typemap(out) pn_atom_t -{ - switch($1.type) - { - case PN_NULL: - $result = Qnil; - break; - - case PN_BOOL: - $result = $1.u.as_bool ? Qtrue : Qfalse; - break; - - case PN_BYTE: - $result = INT2NUM($1.u.as_byte); - break; - - case PN_UBYTE: - $result = UINT2NUM($1.u.as_ubyte); - break; - - case PN_SHORT: - $result = INT2NUM($1.u.as_short); - break; - - case PN_USHORT: - $result = UINT2NUM($1.u.as_ushort); - break; - - case PN_INT: - $result = INT2NUM($1.u.as_int); - break; - - case PN_UINT: - $result = UINT2NUM($1.u.as_uint); - break; - - case PN_LONG: - $result = LL2NUM($1.u.as_long); - break; - - case PN_ULONG: - $result = ULL2NUM($1.u.as_ulong); - break; - - case PN_FLOAT: - $result = rb_float_new($1.u.as_float); - break; - - case PN_DOUBLE: - $result = rb_float_new($1.u.as_double); - break; - - case PN_STRING: - $result = rb_str_new($1.u.as_bytes.start, $1.u.as_bytes.size); - break; - - default: - break; - } -} - -%typemap (in) pn_decimal32_t -{ - $1 = FIX2UINT($input); -} - -%typemap (out) pn_decimal32_t -{ - $result = ULL2NUM($1); -} - -%typemap (in) pn_decimal64_t -{ - $1 = NUM2ULL($input); -} - -%typemap (out) pn_decimal64_t -{ - $result = ULL2NUM($1); -} - -%typemap (in) pn_decimal128_t -{ - int index; - - for(index = 0; index < 16; index++) - { - VALUE element = rb_ary_entry($input, index); - $1.bytes[16 - (index + 1)] = FIX2INT(element); - } -} - -%typemap (out) pn_decimal128_t -{ - int index; - - $result = rb_ary_new2(16); - for(index = 0; index < 16; index++) - { - rb_ary_store($result, 16 - (index + 1), CHR2FIX($1.bytes[index])); - } -} - -%typemap (in) pn_uuid_t -{ - int index; - - for(index = 0; index < 16; index++) - { - VALUE element = rb_ary_entry($input, index); - $1.bytes[16 - (index + 1)] = FIX2INT(element); - } -} - -%typemap (out) pn_uuid_t -{ - int index; - - $result = rb_ary_new2(16); - for(index = 0; index < 16; index++) - { - rb_ary_store($result, 16 - (index + 1), CHR2FIX($1.bytes[index])); - } -} - -int pn_message_encode(pn_message_t *msg, char *OUTPUT, size_t *OUTPUT_SIZE); -%ignore pn_message_encode; - -ssize_t pn_link_send(pn_link_t *transport, char *STRING, size_t LENGTH); -%ignore pn_link_send; - -%rename(pn_link_recv) wrap_pn_link_recv; -%inline %{ - int wrap_pn_link_recv(pn_link_t *link, char *OUTPUT, size_t *OUTPUT_SIZE) { - ssize_t sz = pn_link_recv(link, OUTPUT, *OUTPUT_SIZE); - if (sz >= 0) { - *OUTPUT_SIZE = sz; - } else { - *OUTPUT_SIZE = 0; - } - return sz; - } -%} -%ignore pn_link_recv; - -ssize_t pn_transport_input(pn_transport_t *transport, char *STRING, size_t LENGTH); -%ignore pn_transport_input; - -%rename(pn_transport_output) wrap_pn_transport_output; -%inline %{ - int wrap_pn_transport_output(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE) { - ssize_t sz = pn_transport_output(transport, OUTPUT, *OUTPUT_SIZE); - if (sz >= 0) { - *OUTPUT_SIZE = sz; - } else { - *OUTPUT_SIZE = 0; - } - return sz; - } -%} -%ignore pn_transport_output; - -%rename(pn_transport_peek) wrap_pn_transport_peek; -%inline %{ - int wrap_pn_transport_peek(pn_transport_t *transport, char *OUTPUT, size_t *OUTPUT_SIZE) { - ssize_t sz = pn_transport_peek(transport, OUTPUT, *OUTPUT_SIZE); - if(sz >= 0) { - *OUTPUT_SIZE = sz; - } else { - *OUTPUT_SIZE = 0; - } - return sz; - } -%} -%ignore pn_transport_peek; - -%rename(pn_delivery) wrap_pn_delivery; -%inline %{ - pn_delivery_t *wrap_pn_delivery(pn_link_t *link, char *STRING, size_t LENGTH) { - return pn_delivery(link, pn_dtag(STRING, LENGTH)); - } -%} -%ignore pn_delivery; - -// Suppress "Warning(451): Setting a const char * variable may leak memory." on pn_delivery_tag_t -%warnfilter(451) pn_delivery_tag_t; -%rename(pn_delivery_tag) wrap_pn_delivery_tag; -%inline %{ - void wrap_pn_delivery_tag(pn_delivery_t *delivery, char **ALLOC_OUTPUT, size_t *ALLOC_SIZE) { - pn_delivery_tag_t tag = pn_delivery_tag(delivery); - *ALLOC_OUTPUT = malloc(tag.size); - *ALLOC_SIZE = tag.size; - memcpy(*ALLOC_OUTPUT, tag.start, tag.size); - } -%} -%ignore pn_delivery_tag; - -bool pn_ssl_get_cipher_name(pn_ssl_t *ssl, char *OUTPUT, size_t MAX_OUTPUT_SIZE); -%ignore pn_ssl_get_cipher_name; - -bool pn_ssl_get_protocol_name(pn_ssl_t *ssl, char *OUTPUT, size_t MAX_OUTPUT_SIZE); -%ignore pn_ssl_get_protocol_name; - -%inline %{ -#if defined(RUBY20) || defined(RUBY21) - - typedef void *non_blocking_return_t; -#define RB_BLOCKING_CALL rb_thread_call_without_gvl - -#elif defined(RUBY19) - - typedef VALUE non_blocking_return_t; -#define RB_BLOCKING_CALL rb_thread_blocking_region - -#endif - %} - -%rename(pn_messenger_send) wrap_pn_messenger_send; -%rename(pn_messenger_recv) wrap_pn_messenger_recv; -%rename(pn_messenger_work) wrap_pn_messenger_work; - -%inline %{ - -#if defined(RB_BLOCKING_CALL) - - static non_blocking_return_t pn_messenger_send_no_gvl(void *args) { - VALUE result = Qnil; - pn_messenger_t *messenger = (pn_messenger_t *)((void **)args)[0]; - int *limit = (int *)((void **)args)[1]; - - int rc = pn_messenger_send(messenger, *limit); - - result = INT2NUM(rc); - return (non_blocking_return_t )result; - } - - static non_blocking_return_t pn_messenger_recv_no_gvl(void *args) { - VALUE result = Qnil; - pn_messenger_t *messenger = (pn_messenger_t *)((void **)args)[0]; - int *limit = (int *)((void **)args)[1]; - - int rc = pn_messenger_recv(messenger, *limit); - - result = INT2NUM(rc); - return (non_blocking_return_t )result; - } - - static non_blocking_return_t pn_messenger_work_no_gvl(void *args) { - VALUE result = Qnil; - pn_messenger_t *messenger = (pn_messenger_t *)((void **)args)[0]; - int *timeout = (int *)((void **)args)[1]; - - int rc = pn_messenger_work(messenger, *timeout); - - result = INT2NUM(rc); - return (non_blocking_return_t )result; - } - -#endif - - int wrap_pn_messenger_send(pn_messenger_t *messenger, int limit) { - int result = 0; - -#if defined(RB_BLOCKING_CALL) - - // only release the gil if we're blocking - if(pn_messenger_is_blocking(messenger)) { - VALUE rc; - void* args[2]; - - args[0] = messenger; - args[1] = &limit; - - rc = RB_BLOCKING_CALL(pn_messenger_send_no_gvl, - &args, RUBY_UBF_PROCESS, NULL); - - if(RTEST(rc)) - { - result = FIX2INT(rc); - } - } - -#else // !defined(RB_BLOCKING_CALL) - result = pn_messenger_send(messenger, limit); -#endif // defined(RB_BLOCKING_CALL) - - return result; - } - - int wrap_pn_messenger_recv(pn_messenger_t *messenger, int limit) { - int result = 0; - -#if defined(RB_BLOCKING_CALL) - // only release the gil if we're blocking - if(pn_messenger_is_blocking(messenger)) { - VALUE rc; - void* args[2]; - - args[0] = messenger; - args[1] = &limit; - - rc = RB_BLOCKING_CALL(pn_messenger_recv_no_gvl, - &args, RUBY_UBF_PROCESS, NULL); - - if(RTEST(rc)) - { - result = FIX2INT(rc); - } - - } else { - result = pn_messenger_recv(messenger, limit); - } -#else // !defined(RB_BLOCKING_CALL) - result = pn_messenger_recv(messenger, limit); -#endif // defined(RB_BLOCKING_CALL) - - return result; - } - - int wrap_pn_messenger_work(pn_messenger_t *messenger, int timeout) { - int result = 0; - -#if defined(RB_BLOCKING_CALL) - // only release the gil if we're blocking - if(timeout) { - VALUE rc; - void* args[2]; - - args[0] = messenger; - args[1] = &timeout; - - rc = RB_BLOCKING_CALL(pn_messenger_work_no_gvl, - &args, RUBY_UBF_PROCESS, NULL); - - if(RTEST(rc)) - { - result = FIX2INT(rc); - } - } else { - result = pn_messenger_work(messenger, timeout); - } -#else - result = pn_messenger_work(messenger, timeout); -#endif - - return result; - } - -%} - -%ignore pn_messenger_send; -%ignore pn_messenger_recv; -%ignore pn_messenger_work; - -%{ -typedef struct Pn_rbkey_t { - void *registry; - char *method; - char *key_value; -} Pn_rbkey_t; - -void Pn_rbkey_initialize(void *vp_rbkey) { - Pn_rbkey_t *rbkey = (Pn_rbkey_t*)vp_rbkey; - assert(rbkey); - rbkey->registry = NULL; - rbkey->method = NULL; - rbkey->key_value = NULL; -} - -void Pn_rbkey_finalize(void *vp_rbkey) { - Pn_rbkey_t *rbkey = (Pn_rbkey_t*)vp_rbkey; - if(rbkey && rbkey->registry && rbkey->method && rbkey->key_value) { - rb_funcall((VALUE )rbkey->registry, rb_intern(rbkey->method), 1, rb_str_new2(rbkey->key_value)); - } - if(rbkey->key_value) { - free(rbkey->key_value); - rbkey->key_value = NULL; - } -} - -/* NOTE: no macro or preprocessor definitions in %inline sections */ -#define CID_Pn_rbkey CID_pn_void -#define Pn_rbkey_inspect NULL -#define Pn_rbkey_compare NULL -#define Pn_rbkey_hashcode NULL - -pn_class_t* Pn_rbkey__class(void) { - static pn_class_t clazz = PN_CLASS(Pn_rbkey); - return &clazz; -} - -Pn_rbkey_t *Pn_rbkey_new(void) { - return (Pn_rbkey_t *) pn_class_new(Pn_rbkey__class(), sizeof(Pn_rbkey_t)); -} -%} - -pn_class_t* Pn_rbkey__class(void); -Pn_rbkey_t *Pn_rbkey_new(void); - -%inline %{ - -Pn_rbkey_t *Pn_rbkey_new(void); - -void Pn_rbkey_set_registry(Pn_rbkey_t *rbkey, void *registry) { - assert(rbkey); - rbkey->registry = registry; -} - -void *Pn_rbkey_get_registry(Pn_rbkey_t *rbkey) { - assert(rbkey); - return rbkey->registry; -} - -void Pn_rbkey_set_method(Pn_rbkey_t *rbkey, char *method) { - assert(rbkey); - rbkey->method = method; -} - -char *Pn_rbkey_get_method(Pn_rbkey_t *rbkey) { - assert(rbkey); - return rbkey->method; -} - -void Pn_rbkey_set_key_value(Pn_rbkey_t *rbkey, char *key_value) { - assert(rbkey); - rbkey->key_value = malloc(strlen(key_value) + 1); - strncpy(rbkey->key_value, key_value, strlen(key_value) + 1); -} - -char *Pn_rbkey_get_key_value(Pn_rbkey_t *rbkey) { - assert(rbkey); - return rbkey->key_value; -} - -Pn_rbkey_t *pni_void2rbkey(void *object) { - return (Pn_rbkey_t *)object; -} - -VALUE pn_void2rb(void *object) { - return (VALUE )object; -} - -void *pn_rb2void(VALUE object) { - return (void *)object; -} - -VALUE pni_address_of(void *object) { - return ULL2NUM((unsigned long )object); -} - -%} - -//%rename(pn_collector_put) wrap_pn_collector_put; -//%inline %{ -// pn_event_t *wrap_pn_collector_put(pn_collector_t *collector, void *context, -// pn_event_type_t type) { -// return pn_collector_put(collector, PN_RBREF, context, type); -// } -// %} -//%ignore pn_collector_put; - -int pn_ssl_get_peer_hostname(pn_ssl_t *ssl, char *OUTPUT, size_t *OUTPUT_SIZE); -%ignore pn_ssl_get_peer_hostname; - -%inline %{ - - VALUE pni_ruby_get_proton_module() { - VALUE mQpid = rb_define_module("Qpid"); - return rb_define_module_under(mQpid, "Proton"); - } - - void pni_ruby_add_to_registry(VALUE key, VALUE value) { - VALUE result = rb_funcall(pni_ruby_get_proton_module(), rb_intern("add_to_registry"), 2, key, value); - } - - VALUE pni_ruby_get_from_registry(VALUE key) { - return rb_funcall(pni_ruby_get_proton_module(), rb_intern("get_from_registry"), 1, key); - } - - void pni_ruby_delete_from_registry(VALUE stored_key) { - rb_funcall(pni_ruby_get_proton_module(), rb_intern("delete_from_registry"), 1, stored_key); - } - - typedef struct { - VALUE handler_key; - } Pni_rbhandler_t; - - static Pni_rbhandler_t *pni_rbhandler(pn_handler_t *handler) { - return (Pni_rbhandler_t *) pn_handler_mem(handler); - } - - static void pni_rbdispatch(pn_handler_t *handler, pn_event_t *event, pn_event_type_t type) { - Pni_rbhandler_t *rbh = pni_rbhandler(handler); - VALUE rbhandler = pni_ruby_get_from_registry(rbh->handler_key); - - rb_funcall(rbhandler, rb_intern("dispatch"), 2, SWIG_NewPointerObj(event, SWIGTYPE_p_pn_event_t, 0), INT2FIX(type)); - } - - static void pni_rbhandler_finalize(pn_handler_t *handler) { - Pni_rbhandler_t *rbh = pni_rbhandler(handler); - pni_ruby_delete_from_registry(rbh->handler_key); - } - - pn_handler_t *pn_rbhandler(VALUE handler) { - pn_handler_t *chandler = pn_handler_new(pni_rbdispatch, sizeof(Pni_rbhandler_t), pni_rbhandler_finalize); - Pni_rbhandler_t *rhy = pni_rbhandler(chandler); - - VALUE ruby_key = rb_class_new_instance(0, NULL, rb_cObject); - pni_ruby_add_to_registry(ruby_key, handler); - - rhy->handler_key = ruby_key; - - return chandler; - } - -%} - -%include "proton/cproton.i" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/bindings/ruby/tests/test_connection_driver.rb ---------------------------------------------------------------------- diff --git a/proton-c/bindings/ruby/tests/test_connection_driver.rb b/proton-c/bindings/ruby/tests/test_connection_driver.rb new file mode 100644 index 0000000..2ddc8ef --- /dev/null +++ b/proton-c/bindings/ruby/tests/test_connection_driver.rb @@ -0,0 +1,70 @@ +#-- +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +#++ + +require 'test_tools' + +include Qpid::Proton + +class ConnectionDriverTest < Minitest::Test + + def setup + @sockets = Socket.pair(:LOCAL, :STREAM, 0) + end + + def test_send_recv + send_class = Class.new(MessagingHandler) do + attr_reader :accepted + def on_sendable(event) event.sender.send Message.new("foo"); end + def on_accepted(event) event.connection.close; @accepted = true; end + end + + recv_class = Class.new(MessagingHandler) do + attr_reader :message + def on_link_opened(event) event.link.flow(1); event.link.open; end + def on_message(event) @message = event.message; event.connection.close; end + end + + sender = ConnectionDriver.new(@sockets[0], send_class.new) + sender.connection.open(); + sender.connection.open_sender() + + receiver = ConnectionDriver.new(@sockets[1], recv_class.new) + drivers = [sender, receiver] + until drivers.all? { |d| d.finished? } + rd = drivers.select {|d| d.can_read? } + wr = drivers.select {|d| d.can_write? } + rs, ws = IO.select(rd, wr) + ws.each { |d| d.write; d.dispatch } + rs.each { |d| d.read; d.dispatch } + end + assert_equal(receiver.handler.message.body, "foo") + assert(sender.handler.accepted) + end + + def test_idle + idle_class = Class.new(MessagingHandler) do + def on_connection_bound(event) event.transport.idle_timeout = 10; end + end + drivers = [ConnectionDriver.new(@sockets[0], idle_class.new), ConnectionDriver.new(@sockets[1])] + drivers[0].connection.open() + now = Time.now + drivers.each { |d| d.process(true, true, now) } until drivers[0].connection.open? + assert_equal(10, drivers[0].transport.idle_timeout) + assert_in_delta(10, (drivers[0].tick(now) - now)*1000, 1) + end +end http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9bb1baad/proton-c/include/proton/cproton.i ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/cproton.i b/proton-c/include/proton/cproton.i index 931437e..5f375de 100644 --- a/proton-c/include/proton/cproton.i +++ b/proton-c/include/proton/cproton.i @@ -1053,7 +1053,18 @@ typedef unsigned long int uintptr_t; pn_selectable_t *pn_cast_pn_selectable(void *x) { return (pn_selectable_t *) x; } %} -%include "proton/url.h" +/* Connection driver */ +%{ +#include <proton/connection_driver.h> +%} +/* Don't wrap the pn_connection_driver_t struct, just the functions */ +%ignore pn_connection_driver_t; +%ignore pn_connection_driver_verrorf; +%ignore pn_connection_driver_logf; +%ignore pn_connection_driver_vlogf; +%include "proton/connection_driver.h" + +%include "proton/url.h" %include "proton/reactor.h" %include "proton/handlers.h" --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org