http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/endpoint.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/endpoint.cpp b/proton-c/bindings/cpp/src/endpoint.cpp new file mode 100644 index 0000000..2e656c9 --- /dev/null +++ b/proton-c/bindings/cpp/src/endpoint.cpp @@ -0,0 +1,40 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/endpoint.hpp" +#include "proton/connection.hpp" +#include "proton/transport.hpp" + +namespace proton { + +endpoint::endpoint() {} + +endpoint::~endpoint() {} + +class connection &endpoint::connection() { + return dynamic_cast<class connection&>(*this); +} + +class transport &endpoint::transport() { + return connection().transport(); +} + +}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/error.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/error.cpp b/proton-c/bindings/cpp/src/error.cpp new file mode 100644 index 0000000..18aaa7c --- /dev/null +++ b/proton-c/bindings/cpp/src/error.cpp @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proton/error.hpp" + +namespace proton { + +static const std::string prefix("proton: "); + +error::error(const std::string& msg) throw() : std::runtime_error(prefix+msg) {} + +message_reject::message_reject(const std::string& msg) throw() : error(msg) {} + +message_release::message_release(const std::string& msg) throw() : error(msg) {} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/event.cpp b/proton-c/bindings/cpp/src/event.cpp new file mode 100644 index 0000000..e12d19c --- /dev/null +++ b/proton-c/bindings/cpp/src/event.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/reactor.h" +#include "proton/event.h" + +#include "proton/event.hpp" +#include "proton/handler.hpp" +#include "proton/error.hpp" + +#include "msg.hpp" +#include "contexts.hpp" + +namespace proton { + +event::event() {} + +event::~event() {} + + +class container &event::container() { + // Subclasses to override as appropriate + throw error(MSG("No container context for event")); +} + +class connection &event::connection() { + throw error(MSG("No connection context for event")); +} + +class sender event::sender() { + throw error(MSG("No sender context for event")); +} + +class receiver event::receiver() { + throw error(MSG("No receiver context for event")); +} + +class link event::link() { + throw error(MSG("No link context for event")); +} + +class message event::message() { + throw error(MSG("No message associated with event")); +} + +void event::message(class message &) { + throw error(MSG("Operation not supported for this type of event")); +} + + + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp new file mode 100644 index 0000000..8e0f675 --- /dev/null +++ b/proton-c/bindings/cpp/src/handler.cpp @@ -0,0 +1,35 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/handler.hpp" +#include "proton/event.hpp" + +namespace proton { + +handler::handler() {} +handler::~handler() {} + +void handler::on_unhandled(event &e) {} + +void handler::add_child_handler(handler &e) { + push_back(&e); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/interop_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/interop_test.cpp b/proton-c/bindings/cpp/src/interop_test.cpp index 0b05566..1d8a871 100644 --- a/proton-c/bindings/cpp/src/interop_test.cpp +++ b/proton-c/bindings/cpp/src/interop_test.cpp @@ -17,10 +17,10 @@ * under the License. */ -#include "proton/Decoder.hpp" -#include "proton/Encoder.hpp" -#include "proton/Value.hpp" -#include "Msg.hpp" +#include "proton/decoder.hpp" +#include "proton/encoder.hpp" +#include "proton/value.hpp" +#include "msg.hpp" #include <stdexcept> #include <string> #include <sstream> @@ -30,9 +30,8 @@ using namespace std; using namespace proton; -using namespace proton::reactor; -std::string testsDir; +std::string tests_dir; struct Fail : public logic_error { Fail(const string& what) : logic_error(what) {} }; #define FAIL(WHAT) throw Fail(MSG(__FILE__ << ":" << __LINE__ << ": " << WHAT)) @@ -42,13 +41,13 @@ struct Fail : public logic_error { Fail(const string& what) : logic_error(what) string read(string filename) { - filename = testsDir+string("/interop/")+filename+string(".amqp"); + filename = tests_dir+string("/interop/")+filename+string(".amqp"); ifstream ifs(filename.c_str()); if (!ifs.good()) FAIL("Can't open " << filename); return string(istreambuf_iterator<char>(ifs), istreambuf_iterator<char>()); } -template <class T> T get(Decoder& d) { return d.getAs<T, TypeIdOf<T>::value>(); } +template <class T> T get(decoder& d) { return d.get_as<T, type_idOf<T>::value>(); } template <class T> std::string str(const T& value) { ostringstream oss; @@ -56,39 +55,39 @@ template <class T> std::string str(const T& value) { return oss.str(); } -// Test Data ostream operator -void testDataOstream() { - Decoder d(read("primitives")); +// Test data ostream operator +void test_data_ostream() { + decoder d(read("primitives")); ASSERT_EQUAL("true, false, 42, 42, -42, 12345, -12345, 12345, -12345, 0.125, 0.125", str(d)); } // Test extracting to exact AMQP types works corectly, extrating to invalid types fails. -void testDecoderPrimitvesExact() { - Decoder d(read("primitives")); +void test_decoder_primitves_exact() { + decoder d(read("primitives")); ASSERT(d.more()); - try { get<std::int8_t>(d); FAIL("got bool as byte"); } catch(DecodeError){} + try { get<std::int8_t>(d); FAIL("got bool as byte"); } catch(decode_error){} ASSERT_EQUAL(true, get<bool>(d)); ASSERT_EQUAL(false, get<bool>(d)); - try { get<std::int8_t>(d); FAIL("got ubyte as byte"); } catch(DecodeError){} + try { get<std::int8_t>(d); FAIL("got ubyte as byte"); } catch(decode_error){} ASSERT_EQUAL(42, get<std::uint8_t>(d)); - try { get<std::int32_t>(d); FAIL("got uint as ushort"); } catch(DecodeError){} + try { get<std::int32_t>(d); FAIL("got uint as ushort"); } catch(decode_error){} ASSERT_EQUAL(42, get<std::uint16_t>(d)); - try { get<std::uint16_t>(d); FAIL("got short as ushort"); } catch(DecodeError){} + try { get<std::uint16_t>(d); FAIL("got short as ushort"); } catch(decode_error){} ASSERT_EQUAL(-42, get<std::int16_t>(d)); ASSERT_EQUAL(12345, get<std::uint32_t>(d)); ASSERT_EQUAL(-12345, get<std::int32_t>(d)); ASSERT_EQUAL(12345, get<std::uint64_t>(d)); ASSERT_EQUAL(-12345, get<std::int64_t>(d)); - try { get<double>(d); FAIL("got float as double"); } catch(DecodeError){} + try { get<double>(d); FAIL("got float as double"); } catch(decode_error){} ASSERT_EQUAL(0.125, get<float>(d)); - try { get<float>(d); FAIL("got double as float"); } catch(DecodeError){} + try { get<float>(d); FAIL("got double as float"); } catch(decode_error){} ASSERT_EQUAL(0.125, get<double>(d)); ASSERT(!d.more()); } // Test inserting primitive sand encoding as AMQP. -void testEncoderPrimitives() { - Encoder e; +void test_encoder_primitives() { + encoder e; e << true << false; e << std::uint8_t(42); e << std::uint16_t(42) << std::int16_t(-42); @@ -101,16 +100,16 @@ void testEncoderPrimitives() { } // Test type conversions. -void testValueConversions() { - Value v; +void test_value_conversions() { + value v; ASSERT_EQUAL(true, bool(v = true)); - ASSERT_EQUAL(2, int(v=Byte(2))); - ASSERT_EQUAL(3, long(v=Byte(3))); - ASSERT_EQUAL(3, long(v=Byte(3))); - ASSERT_EQUAL(1.0, double(v=Float(1.0))); - ASSERT_EQUAL(1.0, float(v=Double(1.0))); - try { bool(v = Byte(1)); FAIL("got byte as bool"); } catch (DecodeError) {} - try { float(v = true); FAIL("got bool as float"); } catch (DecodeError) {} + ASSERT_EQUAL(2, int(v=amqp_byte(2))); + ASSERT_EQUAL(3, long(v=amqp_byte(3))); + ASSERT_EQUAL(3, long(v=amqp_byte(3))); + ASSERT_EQUAL(1.0, double(v=amqp_float(1.0))); + ASSERT_EQUAL(1.0, float(v=amqp_double(1.0))); + try { bool(v = amqp_byte(1)); FAIL("got byte as bool"); } catch (decode_error) {} + try { float(v = true); FAIL("got bool as float"); } catch (decode_error) {} } int run_test(void (*testfn)(), const char* name) { @@ -132,11 +131,11 @@ int run_test(void (*testfn)(), const char* name) { int main(int argc, char** argv) { int failed = 0; if (argc != 2) FAIL("Usage: " << argv[0] << " tests-dir"); - testsDir = argv[1]; + tests_dir = argv[1]; - failed += RUN_TEST(testDataOstream); - failed += RUN_TEST(testDecoderPrimitvesExact); - failed += RUN_TEST(testEncoderPrimitives); - failed += RUN_TEST(testValueConversions); + failed += RUN_TEST(test_data_ostream); + failed += RUN_TEST(test_decoder_primitves_exact); + failed += RUN_TEST(test_encoder_primitives); + failed += RUN_TEST(test_value_conversions); return failed; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp new file mode 100644 index 0000000..2470510 --- /dev/null +++ b/proton-c/bindings/cpp/src/link.cpp @@ -0,0 +1,113 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/link.hpp" +#include "proton/error.hpp" +#include "proton/connection.hpp" +#include "connection_impl.hpp" +#include "msg.hpp" +#include "contexts.hpp" +#include "proton_impl_ref.hpp" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/link.h" + +namespace proton { + +template class proton_handle<pn_link_t>; +typedef proton_impl_ref<link> PI; + +link::link(pn_link_t* p) { + verify_type(p); + PI::ctor(*this, p); + if (p) sender_link = pn_link_is_sender(p); +} +link::link() { + PI::ctor(*this, 0); +} +link::link(const link& c) : proton_handle<pn_link_t>() { + verify_type(impl_); + PI::copy(*this, c); + sender_link = c.sender_link; +} +link& link::operator=(const link& c) { + verify_type(impl_); + sender_link = c.sender_link; + return PI::assign(*this, c); +} +link::~link() { PI::dtor(*this); } + +void link::verify_type(pn_link_t *l) {} // Generic link can be sender or receiver + +pn_link_t *link::pn_link() const { return impl_; } + +void link::open() { + pn_link_open(impl_); +} + +void link::close() { + pn_link_close(impl_); +} + +bool link::is_sender() { + return impl_ && sender_link; +} + +bool link::is_receiver() { + return impl_ && !sender_link; +} + +int link::credit() { + return pn_link_credit(impl_); +} + +terminus link::source() { + return terminus(pn_link_source(impl_), this); +} + +terminus link::target() { + return terminus(pn_link_target(impl_), this); +} + +terminus link::remote_source() { + return terminus(pn_link_remote_source(impl_), this); +} + +terminus link::remote_target() { + return terminus(pn_link_remote_target(impl_), this); +} + +std::string link::name() { + return std::string(pn_link_name(impl_)); +} + +class connection &link::connection() { + pn_session_t *s = pn_link_session(impl_); + pn_connection_t *c = pn_session_connection(s); + return connection_impl::reactor_reference(c); +} + +link link::next(endpoint::State mask) { + + return link(pn_link_next(impl_, (pn_state_t) mask)); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/message.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/message.cpp b/proton-c/bindings/cpp/src/message.cpp new file mode 100644 index 0000000..62dca4d --- /dev/null +++ b/proton-c/bindings/cpp/src/message.cpp @@ -0,0 +1,252 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/message.hpp" +#include "proton/error.hpp" +#include "proton/message.h" +#include "msg.hpp" +#include "proton_bits.hpp" +#include "proton_impl_ref.hpp" + +#include <cstring> + +namespace proton { + +template class proton_handle<pn_message_t>; + +typedef proton_impl_ref<message> PI; + +message::message() : body_(0) { + PI::ctor(*this, 0); +} +message::message(pn_message_t *p) : body_(0) { + PI::ctor(*this, p); +} +message::message(const message& m) : proton_handle<pn_message_t>(), body_(0) { + PI::copy(*this, m); +} + +// FIXME aconway 2015-06-17: message should be a value type, needs to own pn_message_t +// and do appropriate _copy and _free operations. +message& message::operator=(const message& m) { + return PI::assign(*this, m); +} +message::~message() { PI::dtor(*this); } + +namespace { +void confirm(pn_message_t * const& p) { + if (p) return; + const_cast<pn_message_t*&>(p) = pn_message(); // Correct refcount of 1 + if (!p) + throw error(MSG("No memory")); +} + +void check(int err) { + if (err) throw error(error_str(err)); +} + +void set_value(pn_data_t* d, const value& v) { + values values(d); + values.clear(); + values << v; +} + +value get_value(pn_data_t* d) { + values values(d); + values.rewind(); + return values.get<value>(); +} +} // namespace + +void message::id(const value& id) { + confirm(impl_); + set_value(pn_message_id(impl_), id); +} + +value message::id() const { + confirm(impl_); + return get_value(pn_message_id(impl_)); +} +void message::user(const std::string &id) { + confirm(impl_); + check(pn_message_set_user_id(impl_, pn_bytes(id))); +} + +std::string message::user() const { + confirm(impl_); + return str(pn_message_get_user_id(impl_)); +} + +void message::address(const std::string &addr) { + confirm(impl_); + check(pn_message_set_address(impl_, addr.c_str())); +} + +std::string message::address() const { + confirm(impl_); + const char* addr = pn_message_get_address(impl_); + return addr ? std::string(addr) : std::string(); +} + +void message::subject(const std::string &s) { + confirm(impl_); + check(pn_message_set_subject(impl_, s.c_str())); +} + +std::string message::subject() const { + confirm(impl_); + const char* s = pn_message_get_subject(impl_); + return s ? std::string(s) : std::string(); +} + +void message::reply_to(const std::string &s) { + confirm(impl_); + check(pn_message_set_reply_to(impl_, s.c_str())); +} + +std::string message::reply_to() const { + confirm(impl_); + const char* s = pn_message_get_reply_to(impl_); + return s ? std::string(s) : std::string(); +} + +void message::correlation_id(const value& id) { + confirm(impl_); + set_value(pn_message_correlation_id(impl_), id); +} + +value message::correlation_id() const { + confirm(impl_); + return get_value(pn_message_correlation_id(impl_)); +} + +void message::content_type(const std::string &s) { + confirm(impl_); + check(pn_message_set_content_type(impl_, s.c_str())); +} + +std::string message::content_type() const { + confirm(impl_); + const char* s = pn_message_get_content_type(impl_); + return s ? std::string(s) : std::string(); +} + +void message::content_encoding(const std::string &s) { + confirm(impl_); + check(pn_message_set_content_encoding(impl_, s.c_str())); +} + +std::string message::content_encoding() const { + confirm(impl_); + const char* s = pn_message_get_content_encoding(impl_); + return s ? std::string(s) : std::string(); +} + +void message::expiry(amqp_timestamp t) { + confirm(impl_); + pn_message_set_expiry_time(impl_, t.milliseconds); +} +amqp_timestamp message::expiry() const { + confirm(impl_); + return amqp_timestamp(pn_message_get_expiry_time(impl_)); +} + +void message::creation_time(amqp_timestamp t) { + confirm(impl_); + pn_message_set_creation_time(impl_, t); +} +amqp_timestamp message::creation_time() const { + confirm(impl_); + return pn_message_get_creation_time(impl_); +} + +void message::group_id(const std::string &s) { + confirm(impl_); + check(pn_message_set_group_id(impl_, s.c_str())); +} + +std::string message::group_id() const { + confirm(impl_); + const char* s = pn_message_get_group_id(impl_); + return s ? std::string(s) : std::string(); +} + +void message::reply_togroup_id(const std::string &s) { + confirm(impl_); + check(pn_message_set_reply_to_group_id(impl_, s.c_str())); +} + +std::string message::reply_togroup_id() const { + confirm(impl_); + const char* s = pn_message_get_reply_to_group_id(impl_); + return s ? std::string(s) : std::string(); +} + +void message::body(const value& v) { + confirm(impl_); + set_value(pn_message_body(impl_), v); +} + +void message::body(const values& v) { + confirm(impl_); + pn_data_copy(pn_message_body(impl_), v.data_); +} + +const values& message::body() const { + confirm(impl_); + body_.view(pn_message_body(impl_)); + return body_; +} + +values& message::body() { + confirm(impl_); + body_.view(pn_message_body(impl_)); + return body_; +} + +void message::encode(std::string &s) { + confirm(impl_); + size_t sz = s.capacity(); + if (sz < 512) sz = 512; + while (true) { + s.resize(sz); + int err = pn_message_encode(impl_, (char *) s.data(), &sz); + if (err) { + if (err != PN_OVERFLOW) + check(err); + } else { + s.resize(sz); + return; + } + sz *= 2; + } +} + +void message::decode(const std::string &s) { + confirm(impl_); + check(pn_message_decode(impl_, s.data(), s.size())); +} + +pn_message_t *message::pn_message() const { + return impl_; +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp new file mode 100644 index 0000000..433b254 --- /dev/null +++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp @@ -0,0 +1,411 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/messaging_adapter.hpp" +#include "proton/messaging_event.hpp" +#include "proton/sender.hpp" +#include "proton/error.hpp" +#include "msg.hpp" + +#include "proton/link.h" +#include "proton/handlers.h" +#include "proton/delivery.h" +#include "proton/connection.h" +#include "proton/session.h" + +namespace proton { +messaging_adapter::messaging_adapter(messaging_handler &delegate_) : + messaging_handler(true, delegate_.prefetch_, delegate_.auto_settle_, delegate_.auto_accept_, delegate_.peer_close_iserror_), + delegate_(delegate_) +{} + + +messaging_adapter::~messaging_adapter(){} + + +void messaging_adapter::on_reactor_init(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + messaging_event mevent(PN_MESSAGING_START, *pe); + delegate_.on_start(mevent); + } +} + +void messaging_adapter::on_link_flow(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_event_t *pne = pe->pn_event(); + pn_link_t *lnk = pn_event_link(pne); + if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) { + // create on_message extended event + messaging_event mevent(PN_MESSAGING_SENDABLE, *pe); + delegate_.on_sendable(mevent);; + } + } +} + +namespace { +message receive_message(pn_link_t *lnk, pn_delivery_t *dlv) { + std::string buf; + size_t sz = pn_delivery_pending(dlv); + buf.resize(sz); + ssize_t n = pn_link_recv(lnk, (char *) buf.data(), sz); + if (n != (ssize_t) sz) + throw error(MSG("link read failure")); + message m; + m. decode(buf); + pn_link_advance(lnk); + return m; +} +} // namespace + +void messaging_adapter::on_delivery(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_event_t *cevent = pe->pn_event(); + pn_link_t *lnk = pn_event_link(cevent); + pn_delivery_t *dlv = pn_event_delivery(cevent); + + if (pn_link_is_receiver(lnk)) { + if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) { + // generate on_message + messaging_event mevent(PN_MESSAGING_MESSAGE, *pe); + message m(receive_message(lnk, dlv)); + mevent.message(m); + if (pn_link_state(lnk) & PN_LOCAL_CLOSED) { + if (auto_accept_) { + pn_delivery_update(dlv, PN_RELEASED); + pn_delivery_settle(dlv); + } + } + else { + try { + delegate_.on_message(mevent); + if (auto_accept_) { + pn_delivery_update(dlv, PN_ACCEPTED); + pn_delivery_settle(dlv); + } + } + catch (message_reject &) { + pn_delivery_update(dlv, PN_REJECTED); + pn_delivery_settle(dlv); + } + catch (message_release &) { + pn_delivery_update(dlv, PN_REJECTED); + pn_delivery_settle(dlv); + } + } + } + else if (pn_delivery_updated(dlv) && pn_delivery_settled(dlv)) { + messaging_event mevent(PN_MESSAGING_SETTLED, *pe); + delegate_.on_settled(mevent); + } + } else { + // sender + if (pn_delivery_updated(dlv)) { + std::uint64_t rstate = pn_delivery_remote_state(dlv); + if (rstate == PN_ACCEPTED) { + messaging_event mevent(PN_MESSAGING_ACCEPTED, *pe); + delegate_.on_accepted(mevent); + } + else if (rstate == PN_REJECTED) { + messaging_event mevent(PN_MESSAGING_REJECTED, *pe); + delegate_.on_rejected(mevent); + } + else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) { + messaging_event mevent(PN_MESSAGING_RELEASED, *pe); + delegate_.on_released(mevent); + } + + if (pn_delivery_settled(dlv)) { + messaging_event mevent(PN_MESSAGING_SETTLED, *pe); + delegate_.on_settled(mevent); + } + if (auto_settle_) + pn_delivery_settle(dlv); + } + } + } +} + +namespace { + +bool is_local_open(pn_state_t state) { + return state & PN_LOCAL_ACTIVE; +} + +bool is_local_unititialised(pn_state_t state) { + return state & PN_LOCAL_UNINIT; +} + +bool is_local_closed(pn_state_t state) { + return state & PN_LOCAL_CLOSED; +} + +bool is_remote_open(pn_state_t state) { + return state & PN_REMOTE_ACTIVE; +} + +} // namespace + +void messaging_adapter::on_link_remote_close(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_event_t *cevent = pe->pn_event(); + pn_link_t *lnk = pn_event_link(cevent); + pn_state_t state = pn_link_state(lnk); + if (pn_condition_is_set(pn_link_remote_condition(lnk))) { + messaging_event mevent(PN_MESSAGING_LINK_ERROR, *pe); + on_link_error(mevent); + } + else if (is_local_closed(state)) { + messaging_event mevent(PN_MESSAGING_LINK_CLOSED, *pe); + on_link_closed(mevent); + } + else { + messaging_event mevent(PN_MESSAGING_LINK_CLOSING, *pe); + on_link_closing(mevent); + } + pn_link_close(lnk); + } +} + +void messaging_adapter::on_session_remote_close(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_event_t *cevent = pe->pn_event(); + pn_session_t *session = pn_event_session(cevent); + pn_state_t state = pn_session_state(session); + if (pn_condition_is_set(pn_session_remote_condition(session))) { + messaging_event mevent(PN_MESSAGING_SESSION_ERROR, *pe); + on_session_error(mevent); + } + else if (is_local_closed(state)) { + messaging_event mevent(PN_MESSAGING_SESSION_CLOSED, *pe); + on_session_closed(mevent); + } + else { + messaging_event mevent(PN_MESSAGING_SESSION_CLOSING, *pe); + on_session_closing(mevent); + } + pn_session_close(session); + } +} + +void messaging_adapter::on_connection_remote_close(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_event_t *cevent = pe->pn_event(); + pn_connection_t *connection = pn_event_connection(cevent); + pn_state_t state = pn_connection_state(connection); + if (pn_condition_is_set(pn_connection_remote_condition(connection))) { + messaging_event mevent(PN_MESSAGING_CONNECTION_ERROR, *pe); + on_connection_error(mevent); + } + else if (is_local_closed(state)) { + messaging_event mevent(PN_MESSAGING_CONNECTION_CLOSED, *pe); + on_connection_closed(mevent); + } + else { + messaging_event mevent(PN_MESSAGING_CONNECTION_CLOSING, *pe); + on_connection_closing(mevent); + } + pn_connection_close(connection); + } +} + +void messaging_adapter::on_connection_local_open(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_connection_t *connection = pn_event_connection(pe->pn_event()); + if (is_remote_open(pn_connection_state(connection))) { + messaging_event mevent(PN_MESSAGING_CONNECTION_OPENED, *pe); + on_connection_opened(mevent); + } + } +} + +void messaging_adapter::on_connection_remote_open(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_connection_t *connection = pn_event_connection(pe->pn_event()); + if (is_local_open(pn_connection_state(connection))) { + messaging_event mevent(PN_MESSAGING_CONNECTION_OPENED, *pe); + on_connection_opened(mevent); + } + else if (is_local_unititialised(pn_connection_state(connection))) { + messaging_event mevent(PN_MESSAGING_CONNECTION_OPENING, *pe); + on_connection_opening(mevent); + pn_connection_open(connection); + } + } +} + +void messaging_adapter::on_session_local_open(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_session_t *session = pn_event_session(pe->pn_event()); + if (is_remote_open(pn_session_state(session))) { + messaging_event mevent(PN_MESSAGING_SESSION_OPENED, *pe); + on_session_opened(mevent); + } + } +} + +void messaging_adapter::on_session_remote_open(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_session_t *session = pn_event_session(pe->pn_event()); + if (is_local_open(pn_session_state(session))) { + messaging_event mevent(PN_MESSAGING_SESSION_OPENED, *pe); + on_session_opened(mevent); + } + else if (is_local_unititialised(pn_session_state(session))) { + messaging_event mevent(PN_MESSAGING_SESSION_OPENING, *pe); + on_session_opening(mevent); + pn_session_open(session); + } + } +} + +void messaging_adapter::on_link_local_open(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_link_t *link = pn_event_link(pe->pn_event()); + if (is_remote_open(pn_link_state(link))) { + messaging_event mevent(PN_MESSAGING_LINK_OPENED, *pe); + on_link_opened(mevent); + } + } +} + +void messaging_adapter::on_link_remote_open(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_link_t *link = pn_event_link(pe->pn_event()); + if (is_local_open(pn_link_state(link))) { + messaging_event mevent(PN_MESSAGING_LINK_OPENED, *pe); + on_link_opened(mevent); + } + else if (is_local_unititialised(pn_link_state(link))) { + messaging_event mevent(PN_MESSAGING_LINK_OPENING, *pe); + on_link_opening(mevent); + pn_link_open(link); + } + } +} + +void messaging_adapter::on_transport_tail_closed(event &e) { + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_connection_t *conn = pn_event_connection(pe->pn_event()); + if (conn && is_local_open(pn_connection_state(conn))) { + messaging_event mevent(PN_MESSAGING_DISCONNECTED, *pe); + delegate_.on_disconnected(mevent); + } + } +} + + +void messaging_adapter::on_connection_opened(event &e) { + delegate_.on_connection_opened(e); +} + +void messaging_adapter::on_session_opened(event &e) { + delegate_.on_session_opened(e); +} + +void messaging_adapter::on_link_opened(event &e) { + delegate_.on_link_opened(e); +} + +void messaging_adapter::on_connection_opening(event &e) { + delegate_.on_connection_opening(e); +} + +void messaging_adapter::on_session_opening(event &e) { + delegate_.on_session_opening(e); +} + +void messaging_adapter::on_link_opening(event &e) { + delegate_.on_link_opening(e); +} + +void messaging_adapter::on_connection_error(event &e) { + delegate_.on_connection_error(e); + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_connection_t *connection = pn_event_connection(pe->pn_event()); + pn_connection_close(connection); + } +} + +void messaging_adapter::on_session_error(event &e) { + delegate_.on_session_error(e); + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_session_t *session = pn_event_session(pe->pn_event()); + pn_session_close(session); + } +} + +void messaging_adapter::on_link_error(event &e) { + delegate_.on_link_error(e); + proton_event *pe = dynamic_cast<proton_event*>(&e); + if (pe) { + pn_link_t *link = pn_event_link(pe->pn_event()); + pn_link_close(link); + } +} + +void messaging_adapter::on_connection_closed(event &e) { + delegate_.on_connection_closed(e); +} + +void messaging_adapter::on_session_closed(event &e) { + delegate_.on_session_closed(e); +} + +void messaging_adapter::on_link_closed(event &e) { + delegate_.on_link_closed(e); +} + +void messaging_adapter::on_connection_closing(event &e) { + delegate_.on_connection_closing(e); + if (peer_close_iserror_) + on_connection_error(e); +} + +void messaging_adapter::on_session_closing(event &e) { + delegate_.on_session_closing(e); + if (peer_close_iserror_) + on_session_error(e); +} + +void messaging_adapter::on_link_closing(event &e) { + delegate_.on_link_closing(e); + if (peer_close_iserror_) + on_link_error(e); +} + +void messaging_adapter::on_unhandled(event &e) { +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/messaging_event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_event.cpp b/proton-c/bindings/cpp/src/messaging_event.cpp new file mode 100644 index 0000000..b5da375 --- /dev/null +++ b/proton-c/bindings/cpp/src/messaging_event.cpp @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/reactor.h" +#include "proton/event.h" +#include "proton/link.h" + +#include "proton/messaging_event.hpp" +#include "proton/message.hpp" +#include "proton/proton_handler.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/error.hpp" +#include "msg.hpp" +#include "contexts.hpp" + +namespace proton { + +messaging_event::messaging_event(pn_event_t *ce, pn_event_type_t t, class container &c) : + proton_event(ce, t, c), messaging_type_(PN_MESSAGING_PROTON), parent_event_(0), message_(0) +{} + +messaging_event::messaging_event(messaging_event_type_t t, proton_event &p) : + proton_event(NULL, PN_EVENT_NONE, p.container()), messaging_type_(t), parent_event_(&p), message_(0) { + if (messaging_type_ == PN_MESSAGING_PROTON) + throw error(MSG("invalid messaging event type")); +} + +messaging_event::~messaging_event() { + delete message_; +} + +connection &messaging_event::connection() { + if (messaging_type_ == PN_MESSAGING_PROTON) + return proton_event::connection(); + if (parent_event_) + return parent_event_->connection(); + throw error(MSG("No connection context for event")); +} + +sender messaging_event::sender() { + if (messaging_type_ == PN_MESSAGING_PROTON) + return proton_event::sender(); + if (parent_event_) + return parent_event_->sender(); + throw error(MSG("No sender context for event")); +} + +receiver messaging_event::receiver() { + if (messaging_type_ == PN_MESSAGING_PROTON) + return proton_event::receiver(); + if (parent_event_) + return parent_event_->receiver(); + throw error(MSG("No receiver context for event")); +} + +link messaging_event::link() { + if (messaging_type_ == PN_MESSAGING_PROTON) + return proton_event::link(); + if (parent_event_) + return parent_event_->link(); + throw error(MSG("No link context for event")); +} + +message messaging_event::message() { + if (parent_event_) { + pn_message_t *m = event_context(parent_event_->pn_event()); + if (m) + return proton::message(m); + } + throw error(MSG("No message context for event")); +} + +void messaging_event::message(class message &m) { + if (messaging_type_ != PN_MESSAGING_MESSAGE || !parent_event_) + throw error(MSG("event type does not provide message")); + event_context(parent_event_->pn_event(), m.pn_message()); +} + +void messaging_event::dispatch(handler &h) { + if (messaging_type_ == PN_MESSAGING_PROTON) { + proton_event::dispatch(h); + return; + } + + messaging_handler *handler = dynamic_cast<messaging_handler*>(&h); + if (handler) { + switch(messaging_type_) { + + case PN_MESSAGING_START: handler->on_start(*this); break; + case PN_MESSAGING_SENDABLE: handler->on_sendable(*this); break; + case PN_MESSAGING_MESSAGE: handler->on_message(*this); break; + case PN_MESSAGING_ACCEPTED: handler->on_accepted(*this); break; + case PN_MESSAGING_REJECTED: handler->on_rejected(*this); break; + case PN_MESSAGING_RELEASED: handler->on_released(*this); break; + case PN_MESSAGING_SETTLED: handler->on_settled(*this); break; + + case PN_MESSAGING_CONNECTION_CLOSING: handler->on_connection_closing(*this); break; + case PN_MESSAGING_CONNECTION_CLOSED: handler->on_connection_closed(*this); break; + case PN_MESSAGING_CONNECTION_ERROR: handler->on_connection_error(*this); break; + case PN_MESSAGING_CONNECTION_OPENING: handler->on_connection_opening(*this); break; + case PN_MESSAGING_CONNECTION_OPENED: handler->on_connection_opened(*this); break; + + case PN_MESSAGING_LINK_CLOSED: handler->on_link_closed(*this); break; + case PN_MESSAGING_LINK_CLOSING: handler->on_link_closing(*this); break; + case PN_MESSAGING_LINK_ERROR: handler->on_link_error(*this); break; + case PN_MESSAGING_LINK_OPENING: handler->on_link_opening(*this); break; + case PN_MESSAGING_LINK_OPENED: handler->on_link_opened(*this); break; + + case PN_MESSAGING_SESSION_CLOSED: handler->on_session_closed(*this); break; + case PN_MESSAGING_SESSION_CLOSING: handler->on_session_closing(*this); break; + case PN_MESSAGING_SESSION_ERROR: handler->on_session_error(*this); break; + case PN_MESSAGING_SESSION_OPENING: handler->on_session_opening(*this); break; + case PN_MESSAGING_SESSION_OPENED: handler->on_session_opened(*this); break; + + case PN_MESSAGING_TRANSPORT_CLOSED: handler->on_transport_closed(*this); break; + default: + throw error(MSG("Unkown messaging event type " << messaging_type_)); + break; + } + } else { + h.on_unhandled(*this); + } + + // recurse through children + for (handler::iterator child = h.begin(); child != h.end(); ++child) { + dispatch(**child); + } +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/messaging_handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_handler.cpp b/proton-c/bindings/cpp/src/messaging_handler.cpp new file mode 100644 index 0000000..184cc9e --- /dev/null +++ b/proton-c/bindings/cpp/src/messaging_handler.cpp @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/messaging_handler.hpp" +#include "proton/proton_event.hpp" +#include "proton/messaging_adapter.hpp" +#include "proton/handlers.h" + +namespace proton { + +namespace { +class Cflow_controller : public proton_handler +{ + public: + pn_handler_t *flowcontroller; + + Cflow_controller(int window) : flowcontroller(pn_flowcontroller(window)) {} + ~Cflow_controller() { + pn_decref(flowcontroller); + } + + void redirect(event &e) { + proton_event *pne = dynamic_cast<proton_event *>(&e); + pn_handler_dispatch(flowcontroller, pne->pn_event(), (pn_event_type_t) pne->type()); + } + + virtual void on_link_local_open(event &e) { redirect(e); } + virtual void on_link_remote_open(event &e) { redirect(e); } + virtual void on_link_flow(event &e) { redirect(e); } + virtual void on_delivery(event &e) { redirect(e); } +}; + +} // namespace + + + + +messaging_handler::messaging_handler(int prefetch0, bool auto_accept0, bool auto_settle0, bool peer_close_isError0) : + prefetch_(prefetch0), auto_accept_(auto_accept0), auto_settle_(auto_settle0), peer_close_iserror_(peer_close_isError0) +{ + create_helpers(); +} + +messaging_handler::messaging_handler(bool raw_handler, int prefetch0, bool auto_accept0, bool auto_settle0, + bool peer_close_isError0) : + prefetch_(prefetch0), auto_accept_(auto_accept0), auto_settle_(auto_settle0), peer_close_iserror_(peer_close_isError0) +{ + if (raw_handler) { + flow_controller_ = 0; + messaging_adapter_ = 0; + } else { + create_helpers(); + } +} + +void messaging_handler::create_helpers() { + if (prefetch_ > 0) { + flow_controller_ = new Cflow_controller(prefetch_); + add_child_handler(*flow_controller_); + } + messaging_adapter_ = new messaging_adapter(*this); + add_child_handler(*messaging_adapter_); +} + +messaging_handler::~messaging_handler(){ + delete flow_controller_; + delete messaging_adapter_; +} + +void messaging_handler::on_abort(event &e) { on_unhandled(e); } +void messaging_handler::on_accepted(event &e) { on_unhandled(e); } +void messaging_handler::on_commit(event &e) { on_unhandled(e); } +void messaging_handler::on_connection_closed(event &e) { on_unhandled(e); } +void messaging_handler::on_connection_closing(event &e) { on_unhandled(e); } +void messaging_handler::on_connection_error(event &e) { on_unhandled(e); } +void messaging_handler::on_connection_opened(event &e) { on_unhandled(e); } +void messaging_handler::on_connection_opening(event &e) { on_unhandled(e); } +void messaging_handler::on_disconnected(event &e) { on_unhandled(e); } +void messaging_handler::on_fetch(event &e) { on_unhandled(e); } +void messaging_handler::on_idLoaded(event &e) { on_unhandled(e); } +void messaging_handler::on_link_closed(event &e) { on_unhandled(e); } +void messaging_handler::on_link_closing(event &e) { on_unhandled(e); } +void messaging_handler::on_link_error(event &e) { on_unhandled(e); } +void messaging_handler::on_link_opened(event &e) { on_unhandled(e); } +void messaging_handler::on_link_opening(event &e) { on_unhandled(e); } +void messaging_handler::on_message(event &e) { on_unhandled(e); } +void messaging_handler::on_quit(event &e) { on_unhandled(e); } +void messaging_handler::on_record_inserted(event &e) { on_unhandled(e); } +void messaging_handler::on_records_loaded(event &e) { on_unhandled(e); } +void messaging_handler::on_rejected(event &e) { on_unhandled(e); } +void messaging_handler::on_released(event &e) { on_unhandled(e); } +void messaging_handler::on_request(event &e) { on_unhandled(e); } +void messaging_handler::on_response(event &e) { on_unhandled(e); } +void messaging_handler::on_sendable(event &e) { on_unhandled(e); } +void messaging_handler::on_session_closed(event &e) { on_unhandled(e); } +void messaging_handler::on_session_closing(event &e) { on_unhandled(e); } +void messaging_handler::on_session_error(event &e) { on_unhandled(e); } +void messaging_handler::on_session_opened(event &e) { on_unhandled(e); } +void messaging_handler::on_session_opening(event &e) { on_unhandled(e); } +void messaging_handler::on_settled(event &e) { on_unhandled(e); } +void messaging_handler::on_start(event &e) { on_unhandled(e); } +void messaging_handler::on_timer(event &e) { on_unhandled(e); } +void messaging_handler::on_transaction_aborted(event &e) { on_unhandled(e); } +void messaging_handler::on_transaction_committed(event &e) { on_unhandled(e); } +void messaging_handler::on_transaction_declared(event &e) { on_unhandled(e); } +void messaging_handler::on_transport_closed(event &e) { on_unhandled(e); } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/msg.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/msg.hpp b/proton-c/bindings/cpp/src/msg.hpp new file mode 100644 index 0000000..b5d2a51 --- /dev/null +++ b/proton-c/bindings/cpp/src/msg.hpp @@ -0,0 +1,58 @@ +#ifndef PROTON_MSG_H +#define PROTON_MSG_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sstream> +#include <iostream> + +namespace proton { + +/** A simple wrapper for std::ostringstream that allows + * in place construction of a message and automatic conversion + * to string. + * E.g. + *@code + * void foo(const std::string&); + * foo(msg() << "hello " << 32); + *@endcode + * Will construct the string "hello 32" and pass it to foo() + */ +struct msg { + std::ostringstream os; + msg() {} + msg(const msg& m) : os(m.str()) {} + std::string str() const { return os.str(); } + operator std::string() const { return str(); } + template <class T> msg& operator<<(const T& t) { os <<t; return *this; } +}; + +inline std::ostream& operator<<(std::ostream& o, const msg& m) { return o << m.str(); } + +/** Construct a message using operator << and append (file:line) */ +#define QUOTe_(x) #x +#define QUOTE(x) QUOTe_(x) +#define MSG(message) (::proton::msg() << message) + +} + +#endif /*!PROTON_MSG_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/private_impl_ref.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/private_impl_ref.hpp b/proton-c/bindings/cpp/src/private_impl_ref.hpp new file mode 100644 index 0000000..2935d79 --- /dev/null +++ b/proton-c/bindings/cpp/src/private_impl_ref.hpp @@ -0,0 +1,96 @@ +#ifndef PROTON_CPP_PRIVATEIMPL_H +#define PROTON_CPP_PRIVATEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/export.hpp" + +namespace proton { + +// Modified from qpid::messaging version to work without +// boost::intrusive_ptr but integrate with Proton's pn_class_t +// reference counting. Thread safety currently absent but fluid in +// intention... + + +/** + * Helper class to implement a class with a private, reference counted + * implementation and reference semantics. + * + * Such classes are used in the public API to hide implementation, they + * should. Example of use: + * + * === Foo.h + * + * template <class T> private_impl_ref; + * class foo_impl; + * + * Foo : public handle<foo_impl> { + * public: + * Foo(foo_impl* = 0); + * Foo(const Foo&); + * ~Foo(); + * Foo& operator=(const Foo&); + * + * int foo_do(); // and other Foo functions... + * + * private: + * typedef foo_impl Impl; + * Impl* impl_; + * friend class private_impl_ref<Foo>; + * + * === Foo.cpp + * + * typedef private_impl_ref<Foo> PI; + * Foo::Foo(foo_impl* p) { PI::ctor(*this, p); } + * Foo::Foo(const Foo& c) : handle<foo_impl>() { PI::copy(*this, c); } + * Foo::~Foo() { PI::dtor(*this); } + * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); } + * + * int foo::foo_do() { return impl_->foo_do(); } + * + */ +template <class T> class private_impl_ref { + public: + typedef typename T::Impl Impl; + + /** Get the implementation pointer from a handle */ + static Impl* get(const T& t) { return t.impl_; } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const Impl* p) { + if (t.impl_ == p) return; + if (t.impl_) Impl::decref(t.impl_); + t.impl_ = const_cast<Impl *>(p); + if (t.impl_) Impl::incref(t.impl_); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl_ = p; if (p) Impl::incref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl_ = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl_) Impl::decref(t.impl_); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +} + +#endif /*!PROTON_CPP_PRIVATEIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/proton_bits.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_bits.cpp b/proton-c/bindings/cpp/src/proton_bits.cpp index e06589c..aea18a3 100644 --- a/proton-c/bindings/cpp/src/proton_bits.cpp +++ b/proton-c/bindings/cpp/src/proton_bits.cpp @@ -23,7 +23,7 @@ #include <proton/object.h> #include "proton_bits.hpp" -std::string errorStr(int code) { +std::string error_str(int code) { switch (code) { case 0: return "ok"; @@ -39,15 +39,15 @@ std::string errorStr(int code) { } } -std::string errorStr(pn_error_t* err, int code) { +std::string error_str(pn_error_t* err, int code) { if (err && pn_error_code(err)) { const char* text = pn_error_text(err); - return text ? std::string(text) : errorStr(pn_error_code(err)); + return text ? std::string(text) : error_str(pn_error_code(err)); } - return errorStr(code); + return error_str(code); } -std::ostream& operator<<(std::ostream& o, const PnObject& object) { +std::ostream& operator<<(std::ostream& o, const pn_object& object) { pn_string_t* str = pn_string(""); pn_inspect(object.value, str); o << pn_string_get(str); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/proton_bits.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_bits.hpp b/proton-c/bindings/cpp/src/proton_bits.hpp index 803dae1..4dd9ad4 100644 --- a/proton-c/bindings/cpp/src/proton_bits.hpp +++ b/proton-c/bindings/cpp/src/proton_bits.hpp @@ -28,16 +28,16 @@ * Assorted internal proton utilities. */ -std::string errorStr(int code); +std::string error_str(int code); /** Print the error string from pn_error_t, or from code if pn_error_t has no error. */ -std::string errorStr(pn_error_t*, int code=0); +std::string error_str(pn_error_t*, int code=0); /** Wrapper for a proton object pointer. */ -struct PnObject { void* value; PnObject(void* o) : value(o) {} }; +struct pn_object { void* value; pn_object(void* o) : value(o) {} }; /** Stream a proton object via pn_inspect. */ -std::ostream& operator<<(std::ostream& o, const PnObject& object); +std::ostream& operator<<(std::ostream& o, const pn_object& object); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/proton_event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_event.cpp b/proton-c/bindings/cpp/src/proton_event.cpp new file mode 100644 index 0000000..2d2f7d5 --- /dev/null +++ b/proton-c/bindings/cpp/src/proton_event.cpp @@ -0,0 +1,151 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/reactor.h" +#include "proton/event.h" +#include "proton/link.h" + +#include "proton/proton_event.hpp" +#include "proton/proton_handler.hpp" +#include "proton/error.hpp" +#include "proton/container.hpp" + +#include "connection_impl.hpp" +#include "msg.hpp" +#include "contexts.hpp" + +namespace proton { + +proton_event::proton_event(pn_event_t *ce, pn_event_type_t t, class container &c) : + pn_event_(ce), + type_((int) t), + container_(c) +{} + +int proton_event::type() { return type_; } + +pn_event_t *proton_event::pn_event() { return pn_event_; } + +container &proton_event::container() { return container_; } + +connection &proton_event::connection() { + pn_connection_t *conn = pn_event_connection(pn_event()); + if (!conn) + throw error(MSG("No connection context for this event")); + return connection_impl::reactor_reference(conn); +} + +sender proton_event::sender() { + pn_link_t *lnk = pn_event_link(pn_event()); + if (lnk && pn_link_is_sender(lnk)) + return proton::sender(lnk); + throw error(MSG("No sender context for this event")); +} + +receiver proton_event::receiver() { + pn_link_t *lnk = pn_event_link(pn_event()); + if (lnk && pn_link_is_receiver(lnk)) + return proton::receiver(lnk); + throw error(MSG("No receiver context for this event")); +} + +link proton_event::link() { + pn_link_t *lnk = pn_event_link(pn_event()); + if (lnk) { + if (pn_link_is_sender(lnk)) + return proton::sender(lnk); + else + return proton::receiver(lnk); + } + throw error(MSG("No link context for this event")); +} + + + + +void proton_event::dispatch(handler &h) { + proton_handler *handler = dynamic_cast<proton_handler*>(&h); + + if (handler) { + switch(type_) { + + case PN_REACTOR_INIT: handler->on_reactor_init(*this); break; + case PN_REACTOR_QUIESCED: handler->on_reactor_quiesced(*this); break; + case PN_REACTOR_FINAL: handler->on_reactor_final(*this); break; + + case PN_TIMER_TASK: handler->on_timer_task(*this); break; + + case PN_CONNECTION_INIT: handler->on_connection_init(*this); break; + case PN_CONNECTION_BOUND: handler->on_connection_bound(*this); break; + case PN_CONNECTION_UNBOUND: handler->on_connection_unbound(*this); break; + case PN_CONNECTION_LOCAL_OPEN: handler->on_connection_local_open(*this); break; + case PN_CONNECTION_LOCAL_CLOSE: handler->on_connection_local_close(*this); break; + case PN_CONNECTION_REMOTE_OPEN: handler->on_connection_remote_open(*this); break; + case PN_CONNECTION_REMOTE_CLOSE: handler->on_connection_remote_close(*this); break; + case PN_CONNECTION_FINAL: handler->on_connection_final(*this); break; + + case PN_SESSION_INIT: handler->on_session_init(*this); break; + case PN_SESSION_LOCAL_OPEN: handler->on_session_local_open(*this); break; + case PN_SESSION_LOCAL_CLOSE: handler->on_session_local_close(*this); break; + case PN_SESSION_REMOTE_OPEN: handler->on_session_remote_open(*this); break; + case PN_SESSION_REMOTE_CLOSE: handler->on_session_remote_close(*this); break; + case PN_SESSION_FINAL: handler->on_session_final(*this); break; + + case PN_LINK_INIT: handler->on_link_init(*this); break; + case PN_LINK_LOCAL_OPEN: handler->on_link_local_open(*this); break; + case PN_LINK_LOCAL_CLOSE: handler->on_link_local_close(*this); break; + case PN_LINK_LOCAL_DETACH: handler->on_link_local_detach(*this); break; + case PN_LINK_REMOTE_OPEN: handler->on_link_remote_open(*this); break; + case PN_LINK_REMOTE_CLOSE: handler->on_link_remote_close(*this); break; + case PN_LINK_REMOTE_DETACH: handler->on_link_remote_detach(*this); break; + case PN_LINK_FLOW: handler->on_link_flow(*this); break; + case PN_LINK_FINAL: handler->on_link_final(*this); break; + + case PN_DELIVERY: handler->on_delivery(*this); break; + + case PN_TRANSPORT: handler->on_transport(*this); break; + case PN_TRANSPORT_ERROR: handler->on_transport_error(*this); break; + case PN_TRANSPORT_HEAD_CLOSED: handler->on_transport_head_closed(*this); break; + case PN_TRANSPORT_TAIL_CLOSED: handler->on_transport_tail_closed(*this); break; + case PN_TRANSPORT_CLOSED: handler->on_transport_closed(*this); break; + + case PN_SELECTABLE_INIT: handler->on_selectable_init(*this); break; + case PN_SELECTABLE_UPDATED: handler->on_selectable_updated(*this); break; + case PN_SELECTABLE_READABLE: handler->on_selectable_readable(*this); break; + case PN_SELECTABLE_WRITABLE: handler->on_selectable_writable(*this); break; + case PN_SELECTABLE_EXPIRED: handler->on_selectable_expired(*this); break; + case PN_SELECTABLE_ERROR: handler->on_selectable_error(*this); break; + case PN_SELECTABLE_FINAL: handler->on_selectable_final(*this); break; + default: + throw error(MSG("Invalid Proton event type " << type_)); + break; + } + } else { + h.on_unhandled(*this); + } + + // recurse through children + for (handler::iterator child = h.begin(); child != h.end(); ++child) { + dispatch(**child); + } +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/proton_handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_handler.cpp b/proton-c/bindings/cpp/src/proton_handler.cpp new file mode 100644 index 0000000..d7c2da4 --- /dev/null +++ b/proton-c/bindings/cpp/src/proton_handler.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/proton_handler.hpp" +#include "proton/proton_event.hpp" + +namespace proton { + +proton_handler::proton_handler(){} + +// Everything goes to on_unhandled() unless overriden by subclass + +void proton_handler::on_reactor_init(event &e) { on_unhandled(e); } +void proton_handler::on_reactor_quiesced(event &e) { on_unhandled(e); } +void proton_handler::on_reactor_final(event &e) { on_unhandled(e); } +void proton_handler::on_timer_task(event &e) { on_unhandled(e); } +void proton_handler::on_connection_init(event &e) { on_unhandled(e); } +void proton_handler::on_connection_bound(event &e) { on_unhandled(e); } +void proton_handler::on_connection_unbound(event &e) { on_unhandled(e); } +void proton_handler::on_connection_local_open(event &e) { on_unhandled(e); } +void proton_handler::on_connection_local_close(event &e) { on_unhandled(e); } +void proton_handler::on_connection_remote_open(event &e) { on_unhandled(e); } +void proton_handler::on_connection_remote_close(event &e) { on_unhandled(e); } +void proton_handler::on_connection_final(event &e) { on_unhandled(e); } +void proton_handler::on_session_init(event &e) { on_unhandled(e); } +void proton_handler::on_session_local_open(event &e) { on_unhandled(e); } +void proton_handler::on_session_local_close(event &e) { on_unhandled(e); } +void proton_handler::on_session_remote_open(event &e) { on_unhandled(e); } +void proton_handler::on_session_remote_close(event &e) { on_unhandled(e); } +void proton_handler::on_session_final(event &e) { on_unhandled(e); } +void proton_handler::on_link_init(event &e) { on_unhandled(e); } +void proton_handler::on_link_local_open(event &e) { on_unhandled(e); } +void proton_handler::on_link_local_close(event &e) { on_unhandled(e); } +void proton_handler::on_link_local_detach(event &e) { on_unhandled(e); } +void proton_handler::on_link_remote_open(event &e) { on_unhandled(e); } +void proton_handler::on_link_remote_close(event &e) { on_unhandled(e); } +void proton_handler::on_link_remote_detach(event &e) { on_unhandled(e); } +void proton_handler::on_link_flow(event &e) { on_unhandled(e); } +void proton_handler::on_link_final(event &e) { on_unhandled(e); } +void proton_handler::on_delivery(event &e) { on_unhandled(e); } +void proton_handler::on_transport(event &e) { on_unhandled(e); } +void proton_handler::on_transport_error(event &e) { on_unhandled(e); } +void proton_handler::on_transport_head_closed(event &e) { on_unhandled(e); } +void proton_handler::on_transport_tail_closed(event &e) { on_unhandled(e); } +void proton_handler::on_transport_closed(event &e) { on_unhandled(e); } +void proton_handler::on_selectable_init(event &e) { on_unhandled(e); } +void proton_handler::on_selectable_updated(event &e) { on_unhandled(e); } +void proton_handler::on_selectable_readable(event &e) { on_unhandled(e); } +void proton_handler::on_selectable_writable(event &e) { on_unhandled(e); } +void proton_handler::on_selectable_expired(event &e) { on_unhandled(e); } +void proton_handler::on_selectable_error(event &e) { on_unhandled(e); } +void proton_handler::on_selectable_final(event &e) { on_unhandled(e); } + +void proton_handler::on_unhandled(event &e) {} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/proton_impl_ref.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_impl_ref.hpp b/proton-c/bindings/cpp/src/proton_impl_ref.hpp new file mode 100644 index 0000000..795754a --- /dev/null +++ b/proton-c/bindings/cpp/src/proton_impl_ref.hpp @@ -0,0 +1,65 @@ +#ifndef PROTON_CPP_PROTONIMPL_H +#define PROTON_CPP_PROTONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/export.hpp" +#include "proton/object.h" + +namespace proton { + +// Modified from qpid::messaging version to work without +// boost::intrusive_ptr but integrate with Proton's pn_class_t +// reference counting. Thread safety currently absent but fluid in +// intention... + + +/** + * See private_impl_ref.h This is for lightly wrapped Proton pn_object_t targets. + * class Foo : proton_handle<pn_foo_t> {...} + */ + +template <class T> class proton_impl_ref { + public: + typedef typename T::Impl Impl; + + /** Get the implementation pointer from a handle */ + static Impl* get(const T& t) { return t.impl_; } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const Impl* p) { + if (t.impl_ == p) return; + if (t.impl_) pn_decref(t.impl_); + t.impl_ = const_cast<Impl *>(p); + if (t.impl_) pn_incref(t.impl_); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl_ = p; if (p) pn_incref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl_ = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl_) pn_decref(t.impl_); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +} + +#endif /*!PROTON_CPP_PROTONIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp new file mode 100644 index 0000000..84412e6 --- /dev/null +++ b/proton-c/bindings/cpp/src/receiver.cpp @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/link.hpp" +#include "proton/receiver.hpp" +#include "proton/error.hpp" +#include "msg.hpp" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/link.h" + +namespace proton { + + +receiver::receiver(pn_link_t *lnk) : link(lnk) {} +receiver::receiver() : link(0) {} + +receiver::receiver(const link& c) : link(c.pn_link()) {} + +void receiver::verify_type(pn_link_t *lnk) { + if (lnk && pn_link_is_sender(lnk)) + throw error(MSG("Creating receiver with sender context")); +} + + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sender.cpp b/proton-c/bindings/cpp/src/sender.cpp new file mode 100644 index 0000000..85d79da --- /dev/null +++ b/proton-c/bindings/cpp/src/sender.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/link.hpp" +#include "proton/sender.hpp" +#include "proton/error.hpp" +#include "msg.hpp" +#include "contexts.hpp" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/link.h" +#include "proton/types.h" +#include "proton/codec.h" +#include "proton/message.h" +#include "proton/delivery.h" +#include <stdlib.h> +#include <string.h> + +namespace proton { + + +sender::sender(pn_link_t *lnk) : link(lnk) {} + +void sender::verify_type(pn_link_t *lnk) { + if (lnk && pn_link_is_receiver(lnk)) + throw error(MSG("Creating sender with receiver context")); +} + +sender::sender(const link& c) : link(c.pn_link()) {} + + +namespace{ +// revisit if thread safety required +std::uint64_t tag_counter = 0; +} + +delivery sender::send(message &message) { + char tag[8]; + void *ptr = &tag; + std::uint64_t id = ++tag_counter; + *((std::uint64_t *) ptr) = id; + pn_delivery_t *dlv = pn_delivery(pn_link(), pn_dtag(tag, 8)); + std::string buf; + message.encode(buf); + pn_link_t *link = pn_link(); + pn_link_send(link, buf.data(), buf.size()); + pn_link_advance(link); + if (pn_link_snd_settle_mode(link) == PN_SND_SETTLED) + pn_delivery_settle(dlv); + return delivery(dlv); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/session.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/session.cpp b/proton-c/bindings/cpp/src/session.cpp new file mode 100644 index 0000000..283b085 --- /dev/null +++ b/proton-c/bindings/cpp/src/session.cpp @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/session.hpp" +#include "contexts.hpp" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/session.hpp" +#include "proton/connection.hpp" +#include "connection_impl.hpp" +#include "proton_impl_ref.hpp" + +namespace proton { + +template class proton_handle<pn_session_t>; +typedef proton_impl_ref<session> PI; + +session::session(pn_session_t *p) { + PI::ctor(*this, p); +} +session::session() { + PI::ctor(*this, 0); +} +session::session(const session& c) : proton_handle<pn_session_t>() { + PI::copy(*this, c); +} +session& session::operator=(const session& c) { + return PI::assign(*this, c); +} +session::~session() { + PI::dtor(*this); +} + +pn_session_t *session::pn_session() { return impl_; } + +void session::open() { + pn_session_open(impl_); +} + +connection &session::connection() { + pn_connection_t *c = pn_session_connection(impl_); + return connection_impl::reactor_reference(c); +} + +receiver session::create_receiver(std::string name) { + pn_link_t *link = pn_receiver(impl_, name.c_str()); + return receiver(link); +} + +sender session::create_sender(std::string name) { + pn_link_t *link = pn_sender(impl_, name.c_str()); + return sender(link); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/terminus.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/terminus.cpp b/proton-c/bindings/cpp/src/terminus.cpp new file mode 100644 index 0000000..6db09d0 --- /dev/null +++ b/proton-c/bindings/cpp/src/terminus.cpp @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/link.hpp" +#include "proton/link.h" + +namespace proton { + +template class proton_handle<pn_terminus_t>; +typedef proton_impl_ref<terminus> PI; + +// Note: the pn_terminus_t is not ref counted. We count the parent link. + +terminus::terminus() : link_(0) { + impl_ = 0; +} + +terminus::terminus(pn_terminus_t *p, link *l) : link_(l) { + impl_ = p; + pn_incref(link_->pn_link()); +} +terminus::terminus(const terminus& c) : proton_handle<pn_terminus_t>() { + impl_ = c.impl_; + link_ = c.link_; + pn_incref(link_->pn_link()); +} +terminus& terminus::operator=(const terminus& c) { + if (impl_ == c.impl_) return *this; + if (impl_) pn_decref(link_->pn_link()); + impl_ = c.impl_; + link_ = c.link_; + pn_incref(link_->pn_link()); + return *this; +} +terminus::~terminus() { + if (impl_) + pn_decref(link_->pn_link()); +} + +pn_terminus_t *terminus::pn_terminus() { return impl_; } + +terminus::type_t terminus::type() { + return (type_t) pn_terminus_get_type(impl_); +} + +void terminus::type(type_t type) { + pn_terminus_set_type(impl_, (pn_terminus_type_t) type); +} + +terminus::expiry_policy_t terminus::expiry_policy() { + return (expiry_policy_t) pn_terminus_get_type(impl_); +} + +void terminus::expiry_policy(expiry_policy_t policy) { + pn_terminus_set_expiry_policy(impl_, (pn_expiry_policy_t) policy); +} + +terminus::distribution_mode_t terminus::distribution_mode() { + return (distribution_mode_t) pn_terminus_get_type(impl_); +} + +void terminus::distribution_mode(distribution_mode_t mode) { + pn_terminus_set_distribution_mode(impl_, (pn_distribution_mode_t) mode); +} + +std::string terminus::address() { + const char *addr = pn_terminus_get_address(impl_); + return addr ? std::string(addr) : std::string(); +} + +void terminus::address(std::string &addr) { + pn_terminus_set_address(impl_, addr.c_str()); +} + +bool terminus::is_dynamic() { + return (type_t) pn_terminus_is_dynamic(impl_); +} + +void terminus::dynamic(bool d) { + pn_terminus_set_dynamic(impl_, d); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/transport.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/transport.cpp b/proton-c/bindings/cpp/src/transport.cpp new file mode 100644 index 0000000..b386f7b --- /dev/null +++ b/proton-c/bindings/cpp/src/transport.cpp @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/transport.hpp" +#include "proton/connection.hpp" + +#include "proton/transport.h" + +namespace proton { + + +transport::transport() : connection_(0), pn_transport_(::pn_transport()) {} + +transport::~transport() { ::pn_decref(pn_transport_); } + +void transport::bind(class connection &c) { + connection_ = &c; + pn_transport_bind(pn_transport_, c.pn_connection()); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/types.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/types.cpp b/proton-c/bindings/cpp/src/types.cpp index b34567b..dcebc6f 100644 --- a/proton-c/bindings/cpp/src/types.cpp +++ b/proton-c/bindings/cpp/src/types.cpp @@ -24,45 +24,28 @@ namespace proton { -Uuid::Uuid() { std::fill(bytes, bytes+SIZE, 0); } -Uuid::Uuid(const pn_uuid_t& u) { std::copy(u.bytes, u.bytes+SIZE, bytes); } - -Uuid::operator pn_uuid_t() const { - pn_uuid_t u; - std::copy(begin(), end(), u.bytes); - return u; -} - -bool Uuid::operator==(const Uuid& x) const { - return std::equal(begin(), end(), x.begin()); -} - -bool Uuid::operator<(const Uuid& x) const { - return std::lexicographical_compare(begin(), end(), x.begin(), x.end()) < 0; -} - namespace { -inline std::ostream& printSegment(std::ostream& o, const Uuid& u, size_t begin, size_t end, const char* sep="") { +inline std::ostream& print_segment(std::ostream& o, const amqp_uuid& u, size_t begin, size_t end, const char* sep="") { for (const char* p = &u[begin]; p < &u[end]; ++p) o << *p; return o << sep; } } -std::ostream& operator<<(std::ostream& o, const Uuid& u) { +std::ostream& operator<<(std::ostream& o, const amqp_uuid& u) { std::ios_base::fmtflags ff = o.flags(); o.flags(std::ios_base::hex); - printSegment(o, u, 0, 4, "-"); - printSegment(o, u, 4, 6, "-"); - printSegment(o, u, 6, 8, "-"); - printSegment(o, u, 8, 10, "-"); - printSegment(o, u, 10, 16); + print_segment(o, u, 0, 4, "-"); + print_segment(o, u, 4, 6, "-"); + print_segment(o, u, 6, 8, "-"); + print_segment(o, u, 8, 10, "-"); + print_segment(o, u, 10, 16); o.flags(ff); return o; } -std::string typeName(TypeId t) { +std::string type_name(type_id t) { switch (t) { - case NULL_: return "null"; + case NULl_: return "null"; case BOOL: return "bool"; case UBYTE: return "ubyte"; case BYTE: return "byte"; @@ -91,9 +74,9 @@ std::string typeName(TypeId t) { } } -std::ostream& operator<<(std::ostream& o,TypeId t) { return o << typeName(t); } +std::ostream& operator<<(std::ostream& o,type_id t) { return o << type_name(t); } -PN_CPP_EXTERN bool isContainer(TypeId t) { +PN_CPP_EXTERN bool is_container(type_id t) { return (t == LIST || t == MAP || t == ARRAY || t == DESCRIBED); } @@ -104,10 +87,10 @@ pn_bytes_t pn_bytes(const std::string& s) { std::string str(const pn_bytes_t& b) { return std::string(b.start, b.size); } -Start::Start(TypeId t, TypeId e, bool d, size_t s) : type(t), element(e), isDescribed(d), size(s) {} -Start Start::array(TypeId element, bool described) { return Start(ARRAY, element, described); } -Start Start::list() { return Start(LIST); } -Start Start::map() { return Start(MAP); } -Start Start::described() { return Start(DESCRIBED, NULL_, true); } +start::start(type_id t, type_id e, bool d, size_t s) : type(t), element(e), is_described(d), size(s) {} +start start::array(type_id element, bool described) { return start(ARRAY, element, described); } +start start::list() { return start(LIST); } +start start::map() { return start(MAP); } +start start::described() { return start(DESCRIBED, NULl_, true); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/url.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/url.cpp b/proton-c/bindings/cpp/src/url.cpp new file mode 100644 index 0000000..5744f49 --- /dev/null +++ b/proton-c/bindings/cpp/src/url.cpp @@ -0,0 +1,76 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/error.hpp" +#include "url.hpp" +#include "proton_impl_ref.hpp" +#include "msg.hpp" + +namespace proton { + +template class proton_handle<pn_url_t>; +typedef proton_impl_ref<Url> PI; + + +Url::Url(const std::string &url) { + pn_url_t *up = pn_url_parse(url.c_str()); + // refcount is 1, no need to incref + if (!up) + throw error(MSG("invalid URL: " << url)); + impl_ = up; +} + +Url::~Url() { PI::dtor(*this); } + +Url::Url(const Url& c) : proton_handle<pn_url_t>() { + PI::copy(*this, c); +} + +Url& Url::operator=(const Url& c) { + return PI::assign(*this, c); +} + +std::string Url::port() { + const char *p = pn_url_get_port(impl_); + if (!p) + return std::string("5672"); + else + return std::string(p); +} + +std::string Url::host() { + const char *p = pn_url_get_host(impl_); + if (!p) + return std::string("0.0.0.0"); + else + return std::string(p); +} + +std::string Url::path() { + const char *p = pn_url_get_path(impl_); + if (!p) + return std::string(""); + else + return std::string(p); +} + + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
