http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp deleted file mode 100644 index eb4ccb1..0000000 --- a/proton-c/bindings/cpp/src/link.cpp +++ /dev/null @@ -1,84 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton_bits.hpp" - -#include "proton/link.hpp" -#include "proton/error.hpp" -#include "proton/connection.hpp" - -#include <proton/connection.h> -#include <proton/session.h> -#include <proton/link.h> - -#include "contexts.hpp" -#include "msg.hpp" -#include "proton_bits.hpp" - -namespace proton { - -void link::attach() { - pn_link_open(pn_object()); -} - -void link::close() { - pn_link_close(pn_object()); -} - -void link::detach() { - pn_link_detach(pn_object()); -} - -int link::credit() const { - pn_link_t *lnk = pn_object(); - if (pn_link_is_sender(lnk)) - return pn_link_credit(lnk); - link_context& lctx = link_context::get(lnk); - return pn_link_credit(lnk) + lctx.pending_credit; -} - -bool link::draining() { - pn_link_t *lnk = pn_object(); - link_context& lctx = link_context::get(lnk); - if (pn_link_is_sender(lnk)) - return pn_link_credit(lnk) > 0 && lctx.draining; - else - return lctx.draining; -} - -std::string link::name() const { return str(pn_link_name(pn_object()));} - -container& link::container() const { - return connection().container(); -} - -class connection link::connection() const { - return make_wrapper(pn_session_connection(pn_link_session(pn_object()))); -} - -class session link::session() const { - return make_wrapper(pn_link_session(pn_object())); -} - -error_condition link::error() const { - return make_wrapper(pn_link_remote_condition(pn_object())); -} -}
http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/listener.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/listener.cpp b/proton-c/bindings/cpp/src/listener.cpp deleted file mode 100644 index 2639f5e..0000000 --- a/proton-c/bindings/cpp/src/listener.cpp +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "proton/listener.hpp" -#include "proton/container.hpp" - -namespace proton { - -listener::listener() : container_(0) {} -listener::listener(container& c, const std::string& u) : url_(u), container_(&c) {} -void listener::stop() { if (container_) container_->stop_listening(url_); } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/message.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/message.cpp b/proton-c/bindings/cpp/src/message.cpp deleted file mode 100644 index eecfa3b..0000000 --- a/proton-c/bindings/cpp/src/message.cpp +++ /dev/null @@ -1,335 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/delivery.hpp" -#include "proton/error.hpp" -#include "proton/link.hpp" -#include "proton/message.hpp" -#include "proton/message_id.hpp" -#include "proton/receiver.hpp" -#include "proton/sender.hpp" -#include "proton/timestamp.hpp" - -#include "msg.hpp" -#include "proton_bits.hpp" -#include "types_internal.hpp" - -#include <proton/delivery.h> -#include <proton/message.h> - -#include <string> -#include <algorithm> -#include <assert.h> - -namespace proton { - -message::message() : pn_msg_(0) {} -message::message(const message &m) : pn_msg_(0) { *this = m; } - -#if PN_CPP_HAS_RVALUE_REFERENCES -message::message(message &&m) : pn_msg_(0) { swap(*this, m); } -message& message::operator=(message&& m) { - swap(*this, m); - return *this; -} -#endif - -message::message(const value& x) : pn_msg_(0) { body() = x; } - -message::~message() { - // Workaround proton bug: Must release all refs to body before calling pn_message_free() - body_.reset(); - pn_message_free(pn_msg_); -} - -void swap(message& x, message& y) { - using std::swap; - swap(x.pn_msg_, y.pn_msg_); - swap(x.body_, y.body_); - swap(x.application_properties_, y.application_properties_); - swap(x.message_annotations_, y.message_annotations_); - swap(x.delivery_annotations_, y.delivery_annotations_); -} - -pn_message_t *message::pn_msg() const { - if (!pn_msg_) pn_msg_ = pn_message(); - body_.refer(pn_message_body(pn_msg_)); - return pn_msg_; -} - -message& message::operator=(const message& m) { - if (&m != this) { - // TODO aconway 2015-08-10: more efficient pn_message_copy function - std::vector<char> data; - m.encode(data); - decode(data); - } - return *this; -} - -void message::clear() { if (pn_msg_) pn_message_clear(pn_msg_); } - -namespace { -void check(int err) { - if (err) throw error(error_str(err)); -} -} // namespace - -void message::id(const message_id& id) { pn_message_set_id(pn_msg(), id.atom_); } - -message_id message::id() const { - return pn_message_get_id(pn_msg()); -} - -void message::user(const std::string &id) { - check(pn_message_set_user_id(pn_msg(), pn_bytes(id))); -} - -std::string message::user() const { - return str(pn_message_get_user_id(pn_msg())); -} - -void message::to(const std::string &addr) { - check(pn_message_set_address(pn_msg(), addr.c_str())); -} - -std::string message::to() const { - const char* addr = pn_message_get_address(pn_msg()); - return addr ? std::string(addr) : std::string(); -} - -void message::address(const std::string &addr) { - check(pn_message_set_address(pn_msg(), addr.c_str())); -} - -std::string message::address() const { - const char* addr = pn_message_get_address(pn_msg()); - return addr ? std::string(addr) : std::string(); -} - -void message::subject(const std::string &s) { - check(pn_message_set_subject(pn_msg(), s.c_str())); -} - -std::string message::subject() const { - const char* s = pn_message_get_subject(pn_msg()); - return s ? std::string(s) : std::string(); -} - -void message::reply_to(const std::string &s) { - check(pn_message_set_reply_to(pn_msg(), s.c_str())); -} - -std::string message::reply_to() const { - const char* s = pn_message_get_reply_to(pn_msg()); - return s ? std::string(s) : std::string(); -} - -void message::correlation_id(const message_id& id) { - internal::value_ref(pn_message_correlation_id(pn_msg())) = id; -} - -message_id message::correlation_id() const { - return pn_message_get_correlation_id(pn_msg()); -} - -void message::content_type(const std::string &s) { - check(pn_message_set_content_type(pn_msg(), s.c_str())); -} - -std::string message::content_type() const { - const char* s = pn_message_get_content_type(pn_msg()); - return s ? std::string(s) : std::string(); -} - -void message::content_encoding(const std::string &s) { - check(pn_message_set_content_encoding(pn_msg(), s.c_str())); -} - -std::string message::content_encoding() const { - const char* s = pn_message_get_content_encoding(pn_msg()); - return s ? std::string(s) : std::string(); -} - -void message::expiry_time(timestamp t) { - pn_message_set_expiry_time(pn_msg(), t.milliseconds()); -} -timestamp message::expiry_time() const { - return timestamp(pn_message_get_expiry_time(pn_msg())); -} - -void message::creation_time(timestamp t) { - pn_message_set_creation_time(pn_msg(), t.milliseconds()); -} -timestamp message::creation_time() const { - return timestamp(pn_message_get_creation_time(pn_msg())); -} - -void message::group_id(const std::string &s) { - check(pn_message_set_group_id(pn_msg(), s.c_str())); -} - -std::string message::group_id() const { - const char* s = pn_message_get_group_id(pn_msg()); - return s ? std::string(s) : std::string(); -} - -void message::reply_to_group_id(const std::string &s) { - check(pn_message_set_reply_to_group_id(pn_msg(), s.c_str())); -} - -std::string message::reply_to_group_id() const { - const char* s = pn_message_get_reply_to_group_id(pn_msg()); - return s ? std::string(s) : std::string(); -} - -bool message::inferred() const { return pn_message_is_inferred(pn_msg()); } - -void message::inferred(bool b) { pn_message_set_inferred(pn_msg(), b); } - -void message::body(const value& x) { body() = x; } - -const value& message::body() const { pn_msg(); return body_; } -value& message::body() { pn_msg(); return body_; } - -// MAP CACHING: the properties and annotations maps can either be encoded in the -// pn_message pn_data_t structures OR decoded as C++ map members of the message -// but not both. At least one of the pn_data_t or the map member is always -// empty, the non-empty one is the authority. - -// Decode a map on demand -template<class M, class F> M& get_map(pn_message_t* msg, F get, M& map) { - codec::decoder d(make_wrapper(get(msg))); - if (map.empty() && !d.empty()) { - d.rewind(); - d >> map; - d.clear(); // The map member is now the authority. - } - return map; -} - -// Encode a map if necessary. -template<class M, class F> M& put_map(pn_message_t* msg, F get, M& map) { - codec::encoder e(make_wrapper(get(msg))); - if (e.empty() && !map.empty()) { - e << map; - map.clear(); // The encoded pn_data_t is now the authority. - } - return map; -} - -message::property_map& message::properties() { - return get_map(pn_msg(), pn_message_properties, application_properties_); -} - -const message::property_map& message::properties() const { - return get_map(pn_msg(), pn_message_properties, application_properties_); -} - - -message::annotation_map& message::message_annotations() { - return get_map(pn_msg(), pn_message_annotations, message_annotations_); -} - -const message::annotation_map& message::message_annotations() const { - return get_map(pn_msg(), pn_message_annotations, message_annotations_); -} - - -message::annotation_map& message::delivery_annotations() { - return get_map(pn_msg(), pn_message_instructions, delivery_annotations_); -} - -const message::annotation_map& message::delivery_annotations() const { - return get_map(pn_msg(), pn_message_instructions, delivery_annotations_); -} - -void message::encode(std::vector<char> &s) const { - put_map(pn_msg(), pn_message_properties, application_properties_); - put_map(pn_msg(), pn_message_annotations, message_annotations_); - put_map(pn_msg(), pn_message_instructions, delivery_annotations_); - size_t sz = std::max(s.capacity(), size_t(512)); - while (true) { - s.resize(sz); - assert(!s.empty()); - int err = pn_message_encode(pn_msg(), const_cast<char*>(&s[0]), &sz); - if (err) { - if (err != PN_OVERFLOW) - check(err); - } else { - s.resize(sz); - return; - } - sz *= 2; - } -} - -std::vector<char> message::encode() const { - std::vector<char> data; - encode(data); - return data; -} - -void message::decode(const std::vector<char> &s) { - if (s.empty()) - throw error("message decode: no data"); - application_properties_.clear(); - message_annotations_.clear(); - delivery_annotations_.clear(); - assert(!s.empty()); - check(pn_message_decode(pn_msg(), &s[0], s.size())); -} - -void message::decode(proton::delivery delivery) { - std::vector<char> buf; - buf.resize(pn_delivery_pending(unwrap(delivery))); - if (buf.empty()) - throw error("message decode: no delivery pending on link"); - proton::receiver link = delivery.receiver(); - assert(!buf.empty()); - ssize_t n = pn_link_recv(unwrap(link), const_cast<char *>(&buf[0]), buf.size()); - if (n != ssize_t(buf.size())) throw error(MSG("receiver read failure")); - clear(); - decode(buf); - pn_link_advance(unwrap(link)); -} - -bool message::durable() const { return pn_message_is_durable(pn_msg()); } -void message::durable(bool b) { pn_message_set_durable(pn_msg(), b); } - -duration message::ttl() const { return duration(pn_message_get_ttl(pn_msg())); } -void message::ttl(duration d) { pn_message_set_ttl(pn_msg(), d.milliseconds()); } - -uint8_t message::priority() const { return pn_message_get_priority(pn_msg()); } -void message::priority(uint8_t d) { pn_message_set_priority(pn_msg(), d); } - -bool message::first_acquirer() const { return pn_message_is_first_acquirer(pn_msg()); } -void message::first_acquirer(bool b) { pn_message_set_first_acquirer(pn_msg(), b); } - -uint32_t message::delivery_count() const { return pn_message_get_delivery_count(pn_msg()); } -void message::delivery_count(uint32_t d) { pn_message_set_delivery_count(pn_msg(), d); } - -int32_t message::group_sequence() const { return pn_message_get_group_sequence(pn_msg()); } -void message::group_sequence(int32_t d) { pn_message_set_group_sequence(pn_msg(), d); } - -const uint8_t message::default_priority = PN_DEFAULT_PRIORITY; - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/message_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/message_test.cpp b/proton-c/bindings/cpp/src/message_test.cpp deleted file mode 100644 index 4d4e239..0000000 --- a/proton-c/bindings/cpp/src/message_test.cpp +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "proton/message.hpp" -#include "proton/scalar.hpp" -#include "test_bits.hpp" -#include <string> -#include <fstream> -#include <streambuf> -#include <iosfwd> - -namespace { - -using namespace std; -using namespace proton; - -#define CHECK_STR(ATTR) \ - m.ATTR(#ATTR); \ - ASSERT_EQUAL(std::string(#ATTR), m.ATTR()) - -#define CHECK_MESSAGE_ID(ATTR) \ - m.ATTR(#ATTR); \ - ASSERT_EQUAL(scalar(#ATTR), m.ATTR()) - -void test_message_defaults() { - message m; - ASSERT(m.body().empty()); - ASSERT(m.id().empty()); - ASSERT(m.user().empty()); - ASSERT(m.to().empty()); - ASSERT(m.subject().empty()); - ASSERT(m.reply_to().empty()); - ASSERT(m.correlation_id().empty()); - ASSERT(m.content_type().empty()); - ASSERT(m.content_encoding().empty()); - ASSERT(m.group_id().empty()); - ASSERT(m.reply_to_group_id().empty()); - ASSERT_EQUAL(0, m.expiry_time().milliseconds()); - ASSERT_EQUAL(0, m.creation_time().milliseconds()); - - ASSERT_EQUAL(false, m.inferred()); - ASSERT_EQUAL(false, m.durable()); - ASSERT_EQUAL(0, m.ttl().milliseconds()); - ASSERT_EQUAL(message::default_priority, m.priority()); - ASSERT_EQUAL(false, m.first_acquirer()); - ASSERT_EQUAL(0u, m.delivery_count()); -} - -void test_message_properties() { - message m("hello"); - std::string s = get<std::string>(m.body()); - ASSERT_EQUAL("hello", s); - - CHECK_MESSAGE_ID(id); - CHECK_STR(user); - CHECK_STR(to); - CHECK_STR(subject); - CHECK_STR(reply_to); - CHECK_MESSAGE_ID(correlation_id); - CHECK_STR(content_type); - CHECK_STR(content_encoding); - CHECK_STR(group_id); - CHECK_STR(reply_to_group_id); - m.expiry_time(timestamp(42)); - ASSERT_EQUAL(m.expiry_time().milliseconds(), 42); - m.creation_time(timestamp(4242)); - ASSERT_EQUAL(m.creation_time().milliseconds(), 4242); - m.ttl(duration(30)); - ASSERT_EQUAL(m.ttl().milliseconds(), 30); - m.priority(3); - ASSERT_EQUAL(m.priority(), 3); - - message m2(m); - ASSERT_EQUAL("hello", get<std::string>(m2.body())); - ASSERT_EQUAL(message_id("id"), m2.id()); - ASSERT_EQUAL("user", m2.user()); - ASSERT_EQUAL("to", m2.to()); - ASSERT_EQUAL("subject", m2.subject()); - ASSERT_EQUAL("reply_to", m2.reply_to()); - ASSERT_EQUAL(message_id("correlation_id"), m2.correlation_id()); - ASSERT_EQUAL("content_type", m2.content_type()); - ASSERT_EQUAL("content_encoding", m2.content_encoding()); - ASSERT_EQUAL("group_id", m2.group_id()); - ASSERT_EQUAL("reply_to_group_id", m2.reply_to_group_id()); - ASSERT_EQUAL(42, m2.expiry_time().milliseconds()); - ASSERT_EQUAL(4242, m.creation_time().milliseconds()); - - m2 = m; - ASSERT_EQUAL("hello", get<std::string>(m2.body())); - ASSERT_EQUAL(message_id("id"), m2.id()); - ASSERT_EQUAL("user", m2.user()); - ASSERT_EQUAL("to", m2.to()); - ASSERT_EQUAL("subject", m2.subject()); - ASSERT_EQUAL("reply_to", m2.reply_to()); - ASSERT_EQUAL(message_id("correlation_id"), m2.correlation_id()); - ASSERT_EQUAL("content_type", m2.content_type()); - ASSERT_EQUAL("content_encoding", m2.content_encoding()); - ASSERT_EQUAL("group_id", m2.group_id()); - ASSERT_EQUAL("reply_to_group_id", m2.reply_to_group_id()); - ASSERT_EQUAL(42, m2.expiry_time().milliseconds()); - ASSERT_EQUAL(4242, m.creation_time().milliseconds()); -} - -void test_message_body() { - std::string s("hello"); - message m1(s.c_str()); - ASSERT_EQUAL(s, get<std::string>(m1.body())); - message m2(s); - ASSERT_EQUAL(s, coerce<std::string>(m2.body())); - message m3; - m3.body(s); - ASSERT_EQUAL(s, coerce<std::string>(m3.body())); - ASSERT_EQUAL(5, coerce<int64_t>(message(5).body())); - ASSERT_EQUAL(3.1, coerce<double>(message(3.1).body())); -} - -void test_message_maps() { - message m; - - ASSERT(m.properties().empty()); - ASSERT(m.message_annotations().empty()); - ASSERT(m.delivery_annotations().empty()); - - m.properties().put("foo", 12); - m.delivery_annotations().put("bar", "xyz"); - - m.message_annotations().put(23, "23"); - ASSERT_EQUAL(m.properties().get("foo"), scalar(12)); - ASSERT_EQUAL(m.delivery_annotations().get("bar"), scalar("xyz")); - ASSERT_EQUAL(m.message_annotations().get(23), scalar("23")); - - message m2(m); - - ASSERT_EQUAL(m2.properties().get("foo"), scalar(12)); - ASSERT_EQUAL(m2.delivery_annotations().get("bar"), scalar("xyz")); - ASSERT_EQUAL(m2.message_annotations().get(23), scalar("23")); - - m.properties().put("foo","newfoo"); - m.delivery_annotations().put(24, 1000); - m.message_annotations().erase(23); - - m2 = m; - ASSERT_EQUAL(1u, m2.properties().size()); - ASSERT_EQUAL(m2.properties().get("foo"), scalar("newfoo")); - ASSERT_EQUAL(2u, m2.delivery_annotations().size()); - ASSERT_EQUAL(m2.delivery_annotations().get("bar"), scalar("xyz")); - ASSERT_EQUAL(m2.delivery_annotations().get(24), scalar(1000)); - ASSERT(m2.message_annotations().empty()); -} - -} - -int main(int, char**) { - int failed = 0; - RUN_TEST(failed, test_message_properties()); - RUN_TEST(failed, test_message_defaults()); - RUN_TEST(failed, test_message_body()); - RUN_TEST(failed, test_message_maps()); - return failed; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp deleted file mode 100644 index 17c84cd..0000000 --- a/proton-c/bindings/cpp/src/messaging_adapter.cpp +++ /dev/null @@ -1,304 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "messaging_adapter.hpp" - -#include "proton/delivery.hpp" -#include "proton/error.hpp" -#include "proton/receiver_options.hpp" -#include "proton/sender.hpp" -#include "proton/sender_options.hpp" -#include "proton/tracker.hpp" -#include "proton/transport.hpp" - -#include "contexts.hpp" -#include "msg.hpp" -#include "proton_bits.hpp" -#include "proton_event.hpp" - -#include <proton/connection.h> -#include <proton/delivery.h> -#include <proton/handlers.h> -#include <proton/link.h> -#include <proton/message.h> -#include <proton/session.h> -#include <proton/transport.h> - -namespace proton { - -namespace { -void credit_topup(pn_link_t *link) { - if (link && pn_link_is_receiver(link)) { - int window = link_context::get(link).credit_window; - if (window) { - int delta = window - pn_link_credit(link); - pn_link_flow(link, delta); - } - } -} -} - -void messaging_adapter::on_reactor_init(proton_event &pe) { - delegate_.on_container_start(pe.container()); -} - -void messaging_adapter::on_reactor_final(proton_event &pe) { - delegate_.on_container_stop(pe.container()); -} - -void messaging_adapter::on_link_flow(proton_event &pe) { - pn_event_t *pne = pe.pn_event(); - pn_link_t *lnk = pn_event_link(pne); - // TODO: process session flow data, if no link-specific data, just return. - if (!lnk) return; - link_context& lctx = link_context::get(lnk); - int state = pn_link_state(lnk); - if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) { - if (pn_link_is_sender(lnk)) { - if (pn_link_credit(lnk) > 0) { - sender s(make_wrapper<sender>(lnk)); - if (pn_link_get_drain(lnk)) { - if (!lctx.draining) { - lctx.draining = true; - delegate_.on_sender_drain_start(s); - } - } - else { - lctx.draining = false; - } - // create on_message extended event - delegate_.on_sendable(s); - } - } - else { - // receiver - if (!pn_link_credit(lnk) && lctx.draining) { - lctx.draining = false; - receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_drain_finish(r); - } - } - } - credit_topup(lnk); -} - -void messaging_adapter::on_delivery(proton_event &pe) { - pn_event_t *cevent = pe.pn_event(); - pn_link_t *lnk = pn_event_link(cevent); - pn_delivery_t *dlv = pn_event_delivery(cevent); - link_context& lctx = link_context::get(lnk); - - if (pn_link_is_receiver(lnk)) { - delivery d(make_wrapper<delivery>(dlv)); - if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) { - // generate on_message - pn_connection_t *pnc = pn_session_connection(pn_link_session(lnk)); - connection_context& ctx = connection_context::get(pnc); - // Reusable per-connection message. - // Avoid expensive heap malloc/free overhead. - // See PROTON-998 - class message &msg(ctx.event_message); - msg.decode(d); - if (pn_link_state(lnk) & PN_LOCAL_CLOSED) { - if (lctx.auto_accept) - d.release(); - } else { - delegate_.on_message(d, msg); - if (lctx.auto_accept && !d.settled()) - d.accept(); - if (lctx.draining && !pn_link_credit(lnk)) { - lctx.draining = false; - receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_drain_finish(r); - } - } - } - else if (pn_delivery_updated(dlv) && d.settled()) { - delegate_.on_delivery_settle(d); - } - if (lctx.draining && pn_link_credit(lnk) == 0) { - lctx.draining = false; - pn_link_set_drain(lnk, false); - receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_drain_finish(r); - if (lctx.pending_credit) { - pn_link_flow(lnk, lctx.pending_credit); - lctx.pending_credit = 0; - } - } - credit_topup(lnk); - } else { - tracker t(make_wrapper<tracker>(dlv)); - // sender - if (pn_delivery_updated(dlv)) { - uint64_t rstate = pn_delivery_remote_state(dlv); - if (rstate == PN_ACCEPTED) { - delegate_.on_tracker_accept(t); - } - else if (rstate == PN_REJECTED) { - delegate_.on_tracker_reject(t); - } - else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) { - delegate_.on_tracker_release(t); - } - - if (t.settled()) { - delegate_.on_tracker_settle(t); - } - if (lctx.auto_settle) - t.settle(); - } - } -} - -namespace { - -bool is_local_open(pn_state_t state) { - return state & PN_LOCAL_ACTIVE; -} - -bool is_local_unititialised(pn_state_t state) { - return state & PN_LOCAL_UNINIT; -} - -bool is_remote_unititialised(pn_state_t state) { - return state & PN_REMOTE_UNINIT; -} - -} // namespace - -void messaging_adapter::on_link_remote_detach(proton_event & pe) { - pn_event_t *cevent = pe.pn_event(); - pn_link_t *lnk = pn_event_link(cevent); - if (pn_link_is_receiver(lnk)) { - receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_detach(r); - } else { - sender s(make_wrapper<sender>(lnk)); - delegate_.on_sender_detach(s); - } - pn_link_detach(lnk); -} - -void messaging_adapter::on_link_remote_close(proton_event &pe) { - pn_event_t *cevent = pe.pn_event(); - pn_link_t *lnk = pn_event_link(cevent); - if (pn_link_is_receiver(lnk)) { - receiver r(make_wrapper<receiver>(lnk)); - if (pn_condition_is_set(pn_link_remote_condition(lnk))) { - delegate_.on_receiver_error(r); - } - delegate_.on_receiver_close(r); - } else { - sender s(make_wrapper<sender>(lnk)); - if (pn_condition_is_set(pn_link_remote_condition(lnk))) { - delegate_.on_sender_error(s); - } - delegate_.on_sender_close(s); - } - pn_link_close(lnk); -} - -void messaging_adapter::on_session_remote_close(proton_event &pe) { - pn_event_t *cevent = pe.pn_event(); - pn_session_t *session = pn_event_session(cevent); - class session s(make_wrapper(session)); - if (pn_condition_is_set(pn_session_remote_condition(session))) { - delegate_.on_session_error(s); - } - delegate_.on_session_close(s); - pn_session_close(session); -} - -void messaging_adapter::on_connection_remote_close(proton_event &pe) { - pn_event_t *cevent = pe.pn_event(); - pn_connection_t *conn = pn_event_connection(cevent); - connection c(make_wrapper(conn)); - if (pn_condition_is_set(pn_connection_remote_condition(conn))) { - delegate_.on_connection_error(c); - } - delegate_.on_connection_close(c); - pn_connection_close(conn); -} - -void messaging_adapter::on_connection_remote_open(proton_event &pe) { - // Generate on_transport_open event here until we find a better place - transport t(make_wrapper(pn_event_transport(pe.pn_event()))); - delegate_.on_transport_open(t); - - pn_connection_t *conn = pn_event_connection(pe.pn_event()); - connection c(make_wrapper(conn)); - delegate_.on_connection_open(c); - if (!is_local_open(pn_connection_state(conn)) && is_local_unititialised(pn_connection_state(conn))) { - pn_connection_open(conn); - } -} - -void messaging_adapter::on_session_remote_open(proton_event &pe) { - pn_session_t *session = pn_event_session(pe.pn_event()); - class session s(make_wrapper(session)); - delegate_.on_session_open(s); - if (!is_local_open(pn_session_state(session)) && is_local_unititialised(pn_session_state(session))) { - pn_session_open(session); - } -} - -void messaging_adapter::on_link_local_open(proton_event &pe) { - credit_topup(pn_event_link(pe.pn_event())); -} - -void messaging_adapter::on_link_remote_open(proton_event &pe) { - pn_link_t *lnk = pn_event_link(pe.pn_event()); - container& c = pe.container(); - if (pn_link_is_receiver(lnk)) { - receiver r(make_wrapper<receiver>(lnk)); - delegate_.on_receiver_open(r); - if (is_local_unititialised(pn_link_state(lnk))) { - r.open(c.receiver_options()); - } - } else { - sender s(make_wrapper<sender>(lnk)); - delegate_.on_sender_open(s); - if (is_local_unititialised(pn_link_state(lnk))) { - s.open(c.sender_options()); - } - } - credit_topup(lnk); -} - -void messaging_adapter::on_transport_closed(proton_event &pe) { - pn_transport_t *tspt = pn_event_transport(pe.pn_event()); - transport t(make_wrapper(tspt)); - - // If the connection isn't open generate on_transport_open event - // because we didn't generate it yet and the events won't match. - pn_connection_t *conn = pn_event_connection(pe.pn_event()); - if (!conn || is_remote_unititialised(pn_connection_state(conn))) { - delegate_.on_transport_open(t); - } - - if (pn_condition_is_set(pn_transport_condition(tspt))) { - delegate_.on_transport_error(t); - } - delegate_.on_transport_close(t); -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/node_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/node_options.cpp b/proton-c/bindings/cpp/src/node_options.cpp deleted file mode 100644 index 5bb2f8e..0000000 --- a/proton-c/bindings/cpp/src/node_options.cpp +++ /dev/null @@ -1,171 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/source_options.hpp" -#include "proton/source.hpp" -#include "proton/target_options.hpp" -#include "proton/target.hpp" - -#include "proton_bits.hpp" - -#include <limits> - -namespace proton { - -template <class T> struct option { - T value; - bool set; - - option() : value(), set(false) {} - option& operator=(const T& x) { value = x; set = true; return *this; } - void update(const option<T>& x) { if (x.set) *this = x.value; } -}; - -namespace { - - void timeout(terminus &t, duration d) { - uint32_t seconds = 0; - if (d == duration::FOREVER) - seconds = std::numeric_limits<uint32_t>::max(); - else if (d != duration::IMMEDIATE) { - uint64_t x = d.milliseconds(); - if ((std::numeric_limits<uint64_t>::max() - x) <= 500) - seconds = std::numeric_limits<uint32_t>::max(); - else { - x = (x + 500) / 1000; - seconds = x < std::numeric_limits<uint32_t>::max() ? x : std::numeric_limits<uint32_t>::max(); - } - } - pn_terminus_set_timeout(unwrap(t), seconds); - } -} - -namespace { - -// Options common to sources and targets - -void node_address(terminus &t, option<std::string> &addr, option<bool> &dynamic) { - if (dynamic.set && dynamic.value) { - pn_terminus_set_dynamic(unwrap(t), true); - // Ignore any addr value for dynamic. - return; - } - if (addr.set) { - pn_terminus_set_address(unwrap(t), addr.value.c_str()); - } -} - -void node_durability(terminus &t, option<enum terminus::durability_mode> &mode) { - if (mode.set) pn_terminus_set_durability(unwrap(t), pn_durability_t(mode.value)); -} - -void node_expiry(terminus &t, option<enum terminus::expiry_policy> &policy, option<duration> &d) { - if (policy.set) pn_terminus_set_expiry_policy(unwrap(t), pn_expiry_policy_t(policy.value)); - if (d.set) timeout(t, d.value); -} - -} - - -class source_options::impl { - public: - option<std::string> address; - option<bool> dynamic; - option<enum source::durability_mode> durability_mode; - option<duration> timeout; - option<enum source::expiry_policy> expiry_policy; - option<enum source::distribution_mode> distribution_mode; - option<source::filter_map> filters; - - void apply(source& s) { - node_address(s, address, dynamic); - node_durability(s, durability_mode); - node_expiry(s, expiry_policy, timeout); - if (distribution_mode.set) - pn_terminus_set_distribution_mode(unwrap(s), pn_distribution_mode_t(distribution_mode.value)); - if (filters.set && !filters.value.empty()) { - // Applied at most once via source_option. No need to clear. - codec::encoder e(make_wrapper(pn_terminus_filter(unwrap(s)))); - e << filters.value; - } - } -}; - -source_options::source_options() : impl_(new impl()) {} -source_options::source_options(const source_options& x) : impl_(new impl()) { - *this = x; -} -source_options::~source_options() {} - -source_options& source_options::operator=(const source_options& x) { - *impl_ = *x.impl_; - return *this; -} - -source_options& source_options::address(const std::string &addr) { impl_->address = addr; return *this; } -source_options& source_options::dynamic(bool b) { impl_->dynamic = b; return *this; } -source_options& source_options::durability_mode(enum source::durability_mode m) { impl_->durability_mode = m; return *this; } -source_options& source_options::timeout(duration d) { impl_->timeout = d; return *this; } -source_options& source_options::expiry_policy(enum source::expiry_policy m) { impl_->expiry_policy = m; return *this; } -source_options& source_options::distribution_mode(enum source::distribution_mode m) { impl_->distribution_mode = m; return *this; } -source_options& source_options::filters(const source::filter_map &map) { impl_->filters = map; return *this; } - -void source_options::apply(source& s) const { impl_->apply(s); } - -// TARGET - -class target_options::impl { - public: - option<std::string> address; - option<bool> dynamic; - option<enum target::durability_mode> durability_mode; - option<duration> timeout; - option<enum target::expiry_policy> expiry_policy; - - void apply(target& t) { - node_address(t, address, dynamic); - node_durability(t, durability_mode); - node_expiry(t, expiry_policy, timeout); - } -}; - -target_options::target_options() : impl_(new impl()) {} -target_options::target_options(const target_options& x) : impl_(new impl()) { - *this = x; -} -target_options::~target_options() {} - -target_options& target_options::operator=(const target_options& x) { - *impl_ = *x.impl_; - return *this; -} - -target_options& target_options::address(const std::string &addr) { impl_->address = addr; return *this; } -target_options& target_options::dynamic(bool b) { impl_->dynamic = b; return *this; } -target_options& target_options::durability_mode(enum target::durability_mode m) { impl_->durability_mode = m; return *this; } -target_options& target_options::timeout(duration d) { impl_->timeout = d; return *this; } -target_options& target_options::expiry_policy(enum target::expiry_policy m) { impl_->expiry_policy = m; return *this; } - -void target_options::apply(target& s) const { impl_->apply(s); } - - - -} // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/object.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/object.cpp b/proton-c/bindings/cpp/src/object.cpp deleted file mode 100644 index 2f3a348..0000000 --- a/proton-c/bindings/cpp/src/object.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "proton/internal/object.hpp" -#include <proton/object.h> - -namespace proton { -namespace internal { - -void pn_ptr_base::incref(void *p) { - if (p) ::pn_incref(const_cast<void*>(p)); -} - -void pn_ptr_base::decref(void *p) { - if (p) ::pn_decref(const_cast<void*>(p)); -} - -std::string pn_ptr_base::inspect(void* p) { - if (!p) return std::string(); - ::pn_string_t* s = ::pn_string(NULL); - (void) ::pn_inspect(p, s); - return std::string(pn_string_get(s)); -} -}} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/proton_bits.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_bits.cpp b/proton-c/bindings/cpp/src/proton_bits.cpp deleted file mode 100644 index 18fc589..0000000 --- a/proton-c/bindings/cpp/src/proton_bits.cpp +++ /dev/null @@ -1,76 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "proton_bits.hpp" -#include "proton/error_condition.hpp" - -#include <string> -#include <ostream> - -#include <proton/condition.h> -#include <proton/error.h> -#include <proton/object.h> - -namespace proton { - -std::string error_str(long code) { - switch (code) - { - case 0: return "ok"; - case PN_EOS: return "end of data stream"; - case PN_ERR: return "error"; - case PN_OVERFLOW: return "overflow"; - case PN_UNDERFLOW: return "underflow"; - case PN_STATE_ERR: return "invalid state"; - case PN_ARG_ERR: return "invalid argument"; - case PN_TIMEOUT: return "timeout"; - case PN_INTR: return "interrupt"; - default: return "unknown error code"; - } -} - -std::string error_str(pn_error_t* err, long code) { - if (err && pn_error_code(err)) { - const char* text = pn_error_text(err); - return text ? std::string(text) : error_str(pn_error_code(err)); - } - return error_str(code); -} - -std::ostream& operator<<(std::ostream& o, const inspectable& object) { - pn_string_t* str = pn_string(""); - pn_inspect(object.value, str); - o << pn_string_get(str); - pn_free(str); - return o; -} - -void set_error_condition(const error_condition& e, pn_condition_t *c) { - pn_condition_clear(c); - - if (!e.name().empty()) { - pn_condition_set_name(c, e.name().c_str()); - } - if (!e.description().empty()) { - pn_condition_set_description(c, e.description().c_str()); - } - internal::value_ref(pn_condition_info(c)) = e.properties(); -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/proton_event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_event.cpp b/proton-c/bindings/cpp/src/proton_event.cpp deleted file mode 100644 index 9a1ffea..0000000 --- a/proton-c/bindings/cpp/src/proton_event.cpp +++ /dev/null @@ -1,88 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton_event.hpp" - -#include "proton/error.hpp" - -#include "msg.hpp" -#include "proton_handler.hpp" -#include "types_internal.hpp" - -namespace proton { - -void proton_event::dispatch(proton_handler &handler) { - pn_event_type_t type = pn_event_type(pn_event_); - switch(type) { - - case PN_REACTOR_INIT: handler.on_reactor_init(*this); break; - case PN_REACTOR_QUIESCED: handler.on_reactor_quiesced(*this); break; - case PN_REACTOR_FINAL: handler.on_reactor_final(*this); break; - - case PN_TIMER_TASK: handler.on_timer_task(*this); break; - - case PN_CONNECTION_INIT: handler.on_connection_init(*this); break; - case PN_CONNECTION_BOUND: handler.on_connection_bound(*this); break; - case PN_CONNECTION_UNBOUND: handler.on_connection_unbound(*this); break; - case PN_CONNECTION_LOCAL_OPEN: handler.on_connection_local_open(*this); break; - case PN_CONNECTION_LOCAL_CLOSE: handler.on_connection_local_close(*this); break; - case PN_CONNECTION_REMOTE_OPEN: handler.on_connection_remote_open(*this); break; - case PN_CONNECTION_REMOTE_CLOSE: handler.on_connection_remote_close(*this); break; - case PN_CONNECTION_FINAL: handler.on_connection_final(*this); break; - - case PN_SESSION_INIT: handler.on_session_init(*this); break; - case PN_SESSION_LOCAL_OPEN: handler.on_session_local_open(*this); break; - case PN_SESSION_LOCAL_CLOSE: handler.on_session_local_close(*this); break; - case PN_SESSION_REMOTE_OPEN: handler.on_session_remote_open(*this); break; - case PN_SESSION_REMOTE_CLOSE: handler.on_session_remote_close(*this); break; - case PN_SESSION_FINAL: handler.on_session_final(*this); break; - - case PN_LINK_INIT: handler.on_link_init(*this); break; - case PN_LINK_LOCAL_OPEN: handler.on_link_local_open(*this); break; - case PN_LINK_LOCAL_CLOSE: handler.on_link_local_close(*this); break; - case PN_LINK_LOCAL_DETACH: handler.on_link_local_detach(*this); break; - case PN_LINK_REMOTE_OPEN: handler.on_link_remote_open(*this); break; - case PN_LINK_REMOTE_CLOSE: handler.on_link_remote_close(*this); break; - case PN_LINK_REMOTE_DETACH: handler.on_link_remote_detach(*this); break; - case PN_LINK_FLOW: handler.on_link_flow(*this); break; - case PN_LINK_FINAL: handler.on_link_final(*this); break; - - case PN_DELIVERY: handler.on_delivery(*this); break; - - case PN_TRANSPORT: handler.on_transport(*this); break; - case PN_TRANSPORT_ERROR: handler.on_transport_error(*this); break; - case PN_TRANSPORT_HEAD_CLOSED: handler.on_transport_head_closed(*this); break; - case PN_TRANSPORT_TAIL_CLOSED: handler.on_transport_tail_closed(*this); break; - case PN_TRANSPORT_CLOSED: handler.on_transport_closed(*this); break; - - case PN_SELECTABLE_INIT: handler.on_selectable_init(*this); break; - case PN_SELECTABLE_UPDATED: handler.on_selectable_updated(*this); break; - case PN_SELECTABLE_READABLE: handler.on_selectable_readable(*this); break; - case PN_SELECTABLE_WRITABLE: handler.on_selectable_writable(*this); break; - case PN_SELECTABLE_EXPIRED: handler.on_selectable_expired(*this); break; - case PN_SELECTABLE_ERROR: handler.on_selectable_error(*this); break; - case PN_SELECTABLE_FINAL: handler.on_selectable_final(*this); break; - default: - throw error(MSG("Invalid Proton event type " << type)); - } -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/proton_handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_handler.cpp b/proton-c/bindings/cpp/src/proton_handler.cpp deleted file mode 100644 index 87d00a3..0000000 --- a/proton-c/bindings/cpp/src/proton_handler.cpp +++ /dev/null @@ -1,74 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton_handler.hpp" -#include "proton_event.hpp" - -namespace proton { - -proton_handler::proton_handler() {} -proton_handler::~proton_handler() {} - -// Everything goes to on_unhandled() unless overriden by subclass - -void proton_handler::on_reactor_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_reactor_quiesced(proton_event &e) { on_unhandled(e); } -void proton_handler::on_reactor_final(proton_event &e) { on_unhandled(e); } -void proton_handler::on_timer_task(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_bound(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_unbound(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_local_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_local_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_remote_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_remote_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_connection_final(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_local_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_local_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_remote_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_remote_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_session_final(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_local_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_local_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_local_detach(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_remote_open(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_remote_close(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_remote_detach(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_flow(proton_event &e) { on_unhandled(e); } -void proton_handler::on_link_final(proton_event &e) { on_unhandled(e); } -void proton_handler::on_delivery(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport_error(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport_head_closed(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport_tail_closed(proton_event &e) { on_unhandled(e); } -void proton_handler::on_transport_closed(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_init(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_updated(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_readable(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_writable(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_expired(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_error(proton_event &e) { on_unhandled(e); } -void proton_handler::on_selectable_final(proton_event &e) { on_unhandled(e); } - -void proton_handler::on_unhandled(proton_event &) {} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/reactor.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/reactor.cpp b/proton-c/bindings/cpp/src/reactor.cpp deleted file mode 100644 index 39f670f..0000000 --- a/proton-c/bindings/cpp/src/reactor.cpp +++ /dev/null @@ -1,95 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "reactor.hpp" -#include "acceptor.hpp" - -#include "proton/connection.hpp" -#include "proton/url.hpp" - -#include "contexts.hpp" -#include "proton_bits.hpp" - -#include <proton/reactor.h> - -namespace proton { - -reactor reactor::create() { - return internal::take_ownership(pn_reactor()).get(); -} - -void reactor::run() { pn_reactor_run(pn_object()); } -void reactor::start() { pn_reactor_start(pn_object()); } -bool reactor::process() { return pn_reactor_process(pn_object()); } -void reactor::stop() { pn_reactor_stop(pn_object()); } -void reactor::wakeup() { pn_reactor_wakeup(pn_object()); } -bool reactor::quiesced() { return pn_reactor_quiesced(pn_object()); } -void reactor::yield() { pn_reactor_yield(pn_object()); } -timestamp reactor::mark() { return timestamp(pn_reactor_mark(pn_object())); } -timestamp reactor::now() { return timestamp(pn_reactor_now(pn_object())); } - -void acceptor::close() { pn_acceptor_close(pn_object()); } - -acceptor reactor::listen(const url& url){ - return make_wrapper(pn_reactor_acceptor(pn_object(), url.host().c_str(), url.port().c_str(), 0)); -} - -void reactor::schedule(int delay, pn_handler_t* handler) { - pn_reactor_schedule(pn_object(), delay, handler); -} - -connection reactor::connection(pn_handler_t* h) const { - return make_wrapper(pn_reactor_connection(pn_object(), h)); -} - -connection reactor::connection_to_host(const std::string &host, const std::string &port, pn_handler_t* h) const { - return make_wrapper(pn_reactor_connection_to_host(pn_object(), host.c_str(), port.c_str(), h)); -} - -void reactor::pn_handler(pn_handler_t* h) { - pn_reactor_set_handler(pn_object(), h); -} - -pn_handler_t* reactor::pn_handler() const { - return pn_reactor_get_handler(pn_object()); -} - -void reactor::pn_global_handler(pn_handler_t* h) { - pn_reactor_set_global_handler(pn_object(), h); -} - -pn_handler_t* reactor::pn_global_handler() const { - return pn_reactor_get_global_handler(pn_object()); -} - -duration reactor::timeout() { - pn_millis_t tmo = pn_reactor_get_timeout(pn_object()); - if (tmo == PN_MILLIS_MAX) - return duration::FOREVER; - return duration(tmo); -} - -void reactor::timeout(duration timeout) { - if (timeout == duration::FOREVER || timeout.milliseconds() > PN_MILLIS_MAX) - pn_reactor_set_timeout(pn_object(), PN_MILLIS_MAX); - else - pn_reactor_set_timeout(pn_object(), timeout.milliseconds()); -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp deleted file mode 100644 index 68d55d0..0000000 --- a/proton-c/bindings/cpp/src/receiver.cpp +++ /dev/null @@ -1,98 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton/receiver.hpp" - -#include "proton/error.hpp" -#include "proton/link.hpp" -#include "proton/receiver_options.hpp" -#include "proton/source.hpp" -#include "proton/target.hpp" - -#include "msg.hpp" -#include "proton_bits.hpp" -#include "contexts.hpp" - -#include <proton/connection.h> -#include <proton/session.h> -#include <proton/link.h> -#include <proton/event.h> -#include <proton/reactor.h> - -namespace proton { - -receiver::receiver(pn_link_t* r): link(make_wrapper(r)) {} - -void receiver::open() { - attach(); -} - -void receiver::open(const receiver_options &opts) { - opts.apply(*this); - attach(); -} - -class source receiver::source() const { - return proton::source(*this); -} - -class target receiver::target() const { - return proton::target(*this); -} - -void receiver::add_credit(uint32_t credit) { - link_context &ctx = link_context::get(pn_object()); - if (ctx.draining) - ctx.pending_credit += credit; - else - pn_link_flow(pn_object(), credit); -} - -void receiver::drain() { - link_context &ctx = link_context::get(pn_object()); - if (ctx.draining) - throw proton::error("drain already in progress"); - else { - ctx.draining = true; - if (credit() > 0) - pn_link_set_drain(pn_object(), true); - else { - // Drain is already complete. No state to communicate over the wire. - // Create dummy flow event where "drain finish" can be detected. - pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object())); - pn_collector_put(pn_connection_collector(pnc), PN_OBJECT, pn_object(), PN_LINK_FLOW); - } - } -} - -receiver_iterator receiver_iterator::operator++() { - if (!!obj_) { - pn_link_t *lnk = pn_link_next(obj_.pn_object(), 0); - while (lnk) { - if (pn_link_is_receiver(lnk) && pn_link_session(lnk) == session_) - break; - lnk = pn_link_next(lnk, 0); - } - obj_ = lnk; - } - return *this; -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/receiver_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp deleted file mode 100644 index 4a4d80f..0000000 --- a/proton-c/bindings/cpp/src/receiver_options.cpp +++ /dev/null @@ -1,130 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/receiver_options.hpp" -#include "proton/messaging_handler.hpp" -#include "proton/source_options.hpp" -#include "proton/target_options.hpp" - -#include <proton/link.h> - -#include "contexts.hpp" -#include "container_impl.hpp" -#include "messaging_adapter.hpp" -#include "proton_bits.hpp" - -namespace proton { - -template <class T> struct option { - T value; - bool set; - - option() : value(), set(false) {} - option& operator=(const T& x) { value = x; set = true; return *this; } - void update(const option<T>& x) { if (x.set) *this = x.value; } -}; - -class receiver_options::impl { - static link_context& get_context(receiver l) { - return link_context::get(unwrap(l)); - } - - static void set_delivery_mode(receiver l, proton::delivery_mode mode) { - switch (mode) { - case delivery_mode::AT_MOST_ONCE: - pn_link_set_snd_settle_mode(unwrap(l), PN_SND_SETTLED); - break; - case delivery_mode::AT_LEAST_ONCE: - pn_link_set_snd_settle_mode(unwrap(l), PN_SND_UNSETTLED); - pn_link_set_rcv_settle_mode(unwrap(l), PN_RCV_FIRST); - break; - default: - break; - } - } - - public: - option<messaging_handler*> handler; - option<proton::delivery_mode> delivery_mode; - option<bool> auto_accept; - option<bool> auto_settle; - option<int> credit_window; - option<bool> dynamic_address; - option<source_options> source; - option<target_options> target; - - - void apply(receiver& r) { - if (r.uninitialized()) { - if (delivery_mode.set) set_delivery_mode(r, delivery_mode.value); - if (handler.set && handler.value) container::impl::set_handler(r, handler.value); - if (auto_settle.set) get_context(r).auto_settle = auto_settle.value; - if (auto_accept.set) get_context(r).auto_accept = auto_accept.value; - if (credit_window.set) get_context(r).credit_window = credit_window.value; - - if (source.set) { - proton::source local_s(make_wrapper<proton::source>(pn_link_source(unwrap(r)))); - source.value.apply(local_s); - } - if (target.set) { - proton::target local_t(make_wrapper<proton::target>(pn_link_target(unwrap(r)))); - target.value.apply(local_t); - } - } - } - - void update(const impl& x) { - handler.update(x.handler); - delivery_mode.update(x.delivery_mode); - auto_accept.update(x.auto_accept); - auto_settle.update(x.auto_settle); - credit_window.update(x.credit_window); - dynamic_address.update(x.dynamic_address); - source.update(x.source); - target.update(x.target); - } - -}; - -receiver_options::receiver_options() : impl_(new impl()) {} -receiver_options::receiver_options(const receiver_options& x) : impl_(new impl()) { - *this = x; -} -receiver_options::~receiver_options() {} - -receiver_options& receiver_options::operator=(const receiver_options& x) { - *impl_ = *x.impl_; - return *this; -} - -void receiver_options::update(const receiver_options& x) { impl_->update(*x.impl_); } - -receiver_options& receiver_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; } -receiver_options& receiver_options::delivery_mode(proton::delivery_mode m) {impl_->delivery_mode = m; return *this; } -receiver_options& receiver_options::auto_accept(bool b) {impl_->auto_accept = b; return *this; } -receiver_options& receiver_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; } -receiver_options& receiver_options::credit_window(int w) {impl_->credit_window = w; return *this; } -receiver_options& receiver_options::source(source_options &s) {impl_->source = s; return *this; } -receiver_options& receiver_options::target(target_options &s) {impl_->target = s; return *this; } - -void receiver_options::apply(receiver& r) const { impl_->apply(r); } - -} // namespace proton http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/reconnect_timer.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/reconnect_timer.cpp b/proton-c/bindings/cpp/src/reconnect_timer.cpp deleted file mode 100644 index c63f8a1..0000000 --- a/proton-c/bindings/cpp/src/reconnect_timer.cpp +++ /dev/null @@ -1,65 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/reconnect_timer.hpp" -#include "proton/error.hpp" -#include "msg.hpp" -#include <proton/types.h> -#include <proton/reactor.h> - -namespace proton { - -reconnect_timer::reconnect_timer(uint32_t first, int32_t max, uint32_t increment, - bool doubling, int32_t max_retries, int32_t timeout) : - first_delay_(first), max_delay_(max), increment_(increment), doubling_(doubling), - max_retries_(max_retries), timeout_(timeout), retries_(0), next_delay_(-1), timeout_deadline_(0) - {} - -void reconnect_timer::reset() { - retries_ = 0; - next_delay_ = 0; - timeout_deadline_ = 0; -} - -int reconnect_timer::next_delay(timestamp now) { - retries_++; - if (max_retries_ >= 0 && retries_ > max_retries_) - return -1; - - if (retries_ == 1) { - if (timeout_ >= duration(0)) - timeout_deadline_ = now + timeout_; - next_delay_ = first_delay_; - } else if (retries_ == 2) { - next_delay_ = next_delay_ + increment_; - } else { - next_delay_ = next_delay_ + ( doubling_ ? next_delay_ : increment_ ); - } - if (timeout_deadline_ != timestamp(0) && now >= timeout_deadline_) - return -1; - if (max_delay_ >= duration(0) && next_delay_ > max_delay_) - next_delay_ = max_delay_; - if (timeout_deadline_ != timestamp(0) && (now + next_delay_ > timeout_deadline_)) - next_delay_ = timeout_deadline_ - now; - return next_delay_.milliseconds(); -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/sasl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sasl.cpp b/proton-c/bindings/cpp/src/sasl.cpp deleted file mode 100644 index 73668cd..0000000 --- a/proton-c/bindings/cpp/src/sasl.cpp +++ /dev/null @@ -1,38 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/sasl.hpp" - -namespace proton { - -enum sasl::outcome sasl::outcome() const { return static_cast<enum outcome>(pn_sasl_outcome(object_)); } - -std::string sasl::user() const { - const char *name = pn_sasl_get_user(object_); - return name ? std::string(name) : std::string(); -} - -std::string sasl::mech() const { - const char *m = pn_sasl_get_mech(object_); - return m ? std::string(m) : std::string(); -} - -} // namespace http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/scalar_base.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/scalar_base.cpp b/proton-c/bindings/cpp/src/scalar_base.cpp deleted file mode 100644 index 840a6e8..0000000 --- a/proton-c/bindings/cpp/src/scalar_base.cpp +++ /dev/null @@ -1,173 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "msg.hpp" -#include "types_internal.hpp" - -#include "proton/binary.hpp" -#include "proton/decimal.hpp" -#include "proton/internal/type_traits.hpp" -#include "proton/scalar_base.hpp" -#include "proton/symbol.hpp" -#include "proton/timestamp.hpp" -#include "proton/uuid.hpp" - -#include <ostream> -#include <sstream> - -namespace proton { - -scalar_base::scalar_base() { atom_.type = PN_NULL; } -scalar_base::scalar_base(const pn_atom_t& a) { set(a); } -scalar_base::scalar_base(const scalar_base& x) { set(x.atom_); } - -scalar_base& scalar_base::operator=(const scalar_base& x) { - if (this != &x) - set(x.atom_); - return *this; -} - -type_id scalar_base::type() const { return type_id(atom_.type); } - -bool scalar_base::empty() const { return type() == NULL_TYPE; } - -void scalar_base::set(const binary& x, pn_type_t t) { - atom_.type = t; - bytes_ = x; - atom_.u.as_bytes = pn_bytes(bytes_); -} - -void scalar_base::set(const pn_atom_t& atom) { - if (type_id_is_string_like(type_id(atom.type))) { - set(bin(atom.u.as_bytes), atom.type); - } else { - atom_ = atom; - bytes_.clear(); - } -} - -void scalar_base::put_(bool x) { atom_.u.as_bool = x; atom_.type = PN_BOOL; } -void scalar_base::put_(uint8_t x) { atom_.u.as_ubyte = x; atom_.type = PN_UBYTE; } -void scalar_base::put_(int8_t x) { atom_.u.as_byte = x; atom_.type = PN_BYTE; } -void scalar_base::put_(uint16_t x) { atom_.u.as_ushort = x; atom_.type = PN_USHORT; } -void scalar_base::put_(int16_t x) { atom_.u.as_short = x; atom_.type = PN_SHORT; } -void scalar_base::put_(uint32_t x) { atom_.u.as_uint = x; atom_.type = PN_UINT; } -void scalar_base::put_(int32_t x) { atom_.u.as_int = x; atom_.type = PN_INT; } -void scalar_base::put_(uint64_t x) { atom_.u.as_ulong = x; atom_.type = PN_ULONG; } -void scalar_base::put_(int64_t x) { atom_.u.as_long = x; atom_.type = PN_LONG; } -void scalar_base::put_(wchar_t x) { atom_.u.as_char = x; atom_.type = PN_CHAR; } -void scalar_base::put_(float x) { atom_.u.as_float = x; atom_.type = PN_FLOAT; } -void scalar_base::put_(double x) { atom_.u.as_double = x; atom_.type = PN_DOUBLE; } -void scalar_base::put_(timestamp x) { atom_.u.as_timestamp = x.milliseconds(); atom_.type = PN_TIMESTAMP; } -void scalar_base::put_(const decimal32& x) { byte_copy(atom_.u.as_decimal32, x); atom_.type = PN_DECIMAL32;; } -void scalar_base::put_(const decimal64& x) { byte_copy(atom_.u.as_decimal64, x); atom_.type = PN_DECIMAL64; } -void scalar_base::put_(const decimal128& x) { byte_copy(atom_.u.as_decimal128, x); atom_.type = PN_DECIMAL128; } -void scalar_base::put_(const uuid& x) { byte_copy(atom_.u.as_uuid, x); atom_.type = PN_UUID; } -void scalar_base::put_(const std::string& x) { set(binary(x), PN_STRING); } -void scalar_base::put_(const symbol& x) { set(binary(x), PN_SYMBOL); } -void scalar_base::put_(const binary& x) { set(x, PN_BINARY); } -void scalar_base::put_(const char* x) { set(binary(std::string(x)), PN_STRING); } -void scalar_base::put_(const null&) { atom_.type = PN_NULL; } - -void scalar_base::ok(pn_type_t t) const { - if (atom_.type != t) throw make_conversion_error(type_id(t), type()); -} - -void scalar_base::get_(bool& x) const { ok(PN_BOOL); x = atom_.u.as_bool; } -void scalar_base::get_(uint8_t& x) const { ok(PN_UBYTE); x = atom_.u.as_ubyte; } -void scalar_base::get_(int8_t& x) const { ok(PN_BYTE); x = atom_.u.as_byte; } -void scalar_base::get_(uint16_t& x) const { ok(PN_USHORT); x = atom_.u.as_ushort; } -void scalar_base::get_(int16_t& x) const { ok(PN_SHORT); x = atom_.u.as_short; } -void scalar_base::get_(uint32_t& x) const { ok(PN_UINT); x = atom_.u.as_uint; } -void scalar_base::get_(int32_t& x) const { ok(PN_INT); x = atom_.u.as_int; } -void scalar_base::get_(wchar_t& x) const { ok(PN_CHAR); x = wchar_t(atom_.u.as_char); } -void scalar_base::get_(uint64_t& x) const { ok(PN_ULONG); x = atom_.u.as_ulong; } -void scalar_base::get_(int64_t& x) const { ok(PN_LONG); x = atom_.u.as_long; } -void scalar_base::get_(timestamp& x) const { ok(PN_TIMESTAMP); x = atom_.u.as_timestamp; } -void scalar_base::get_(float& x) const { ok(PN_FLOAT); x = atom_.u.as_float; } -void scalar_base::get_(double& x) const { ok(PN_DOUBLE); x = atom_.u.as_double; } -void scalar_base::get_(decimal32& x) const { ok(PN_DECIMAL32); byte_copy(x, atom_.u.as_decimal32); } -void scalar_base::get_(decimal64& x) const { ok(PN_DECIMAL64); byte_copy(x, atom_.u.as_decimal64); } -void scalar_base::get_(decimal128& x) const { ok(PN_DECIMAL128); byte_copy(x, atom_.u.as_decimal128); } -void scalar_base::get_(uuid& x) const { ok(PN_UUID); byte_copy(x, atom_.u.as_uuid); } -void scalar_base::get_(std::string& x) const { ok(PN_STRING); x = std::string(bytes_.begin(), bytes_.end()); } -void scalar_base::get_(symbol& x) const { ok(PN_SYMBOL); x = symbol(bytes_.begin(), bytes_.end()); } -void scalar_base::get_(binary& x) const { ok(PN_BINARY); x = bytes_; } -void scalar_base::get_(null&) const { ok(PN_NULL); } - -namespace { - -struct equal_op { - const scalar_base& x; - equal_op(const scalar_base& s) : x(s) {} - template<class T> bool operator()(const T& y) { return (internal::get<T>(x) == y); } -}; - -struct less_op { - const scalar_base& x; - less_op(const scalar_base& s) : x(s) {} - template<class T> bool operator()(const T& y) { return (y < internal::get<T>(x)); } -}; - -struct ostream_op { - std::ostream& o; - ostream_op(std::ostream& o_) : o(o_) {} - template<class T> std::ostream& operator()(const T& x) { return o << x; } -}; - -} // namespace - -bool operator==(const scalar_base& x, const scalar_base& y) { - if (x.type() != y.type()) return false; - if (x.type() == NULL_TYPE) return true; - return internal::visit<bool>(x, equal_op(y)); -} - -bool operator<(const scalar_base& x, const scalar_base& y) { - if (x.type() != y.type()) return x.type() < y.type(); - if (x.type() == NULL_TYPE) return false; - return internal::visit<bool>(x, less_op(y)); -} - -std::ostream& operator<<(std::ostream& o, const scalar_base& s) { - switch (s.type()) { - case NULL_TYPE: return o; // NULL is empty, doesn't print (like empty string) - // Print byte types as integer, not char. - case BYTE: return o << static_cast<int>(internal::get<int8_t>(s)); - case UBYTE: return o << static_cast<unsigned int>(internal::get<uint8_t>(s)); - // Other types printed using normal C++ operator << - default: return internal::visit<std::ostream&>(s, ostream_op(o)); - } -} - -namespace internal { - -conversion_error make_coercion_error(const char* cpp, type_id amqp) { - return conversion_error(std::string("invalid proton::coerce<") + cpp + ">(" + type_name(amqp) + ")"); -} - -} // internal - -std::string to_string(const scalar_base& x) { - std::ostringstream os; - os << std::boolalpha << x; - return os.str(); -} - -} // proton http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/scalar_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/scalar_test.cpp b/proton-c/bindings/cpp/src/scalar_test.cpp deleted file mode 100644 index 54be0b9..0000000 --- a/proton-c/bindings/cpp/src/scalar_test.cpp +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "scalar_test.hpp" - -namespace { - -using namespace std; -using namespace proton; - -using test::scalar_test_group; - -// NOTE: proton::coerce<> and bad proton::get() are tested in value_test to avoid redundant test code. - -void encode_decode_test() { - value v; - scalar a("foo"); - v = a; // Assignment to value does encode, get<> does decode. - ASSERT_EQUAL(v, a); - ASSERT_EQUAL(std::string("foo"), get<std::string>(v)); - scalar a2 = get<scalar>(v); - ASSERT_EQUAL(std::string("foo"), get<std::string>(a2)); -} - -void message_id_test() { - ASSERT_EQUAL(23, coerce<int64_t>(message_id(23))); - ASSERT_EQUAL(23u, get<uint64_t>(message_id(23))); - ASSERT(message_id("foo") != message_id(binary("foo"))); - ASSERT_EQUAL(scalar("foo"), message_id("foo")); - ASSERT_EQUAL("foo", coerce<std::string>(message_id("foo"))); - ASSERT(message_id("a") < message_id("z")); - uuid r = uuid::random(); - ASSERT_EQUAL(r, get<uuid>(message_id(r))); -} - -void annotation_key_test() { - ASSERT_EQUAL(23, coerce<int64_t>(annotation_key(23))); - ASSERT_EQUAL(23u, get<uint64_t>(annotation_key(23))); - ASSERT_EQUAL("foo", coerce<std::string>(annotation_key("foo"))); - ASSERT_EQUAL(scalar(symbol("foo")), annotation_key("foo")); -} - -template <class T> T make(const char c) { T x; std::fill(x.begin(), x.end(), c); return x; } - -} - -int main(int, char**) { - int failed = 0; - scalar_test_group<scalar>(failed); - - RUN_TEST(failed, encode_decode_test()); - RUN_TEST(failed, message_id_test()); - RUN_TEST(failed, annotation_key_test()); - return failed; -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sender.cpp b/proton-c/bindings/cpp/src/sender.cpp deleted file mode 100644 index 70e9fa6..0000000 --- a/proton-c/bindings/cpp/src/sender.cpp +++ /dev/null @@ -1,100 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/sender.hpp" - -#include "proton/link.hpp" -#include "proton/sender_options.hpp" -#include "proton/source.hpp" -#include "proton/target.hpp" -#include "proton/tracker.hpp" - -#include <proton/delivery.h> -#include <proton/link.h> -#include <proton/types.h> - -#include "proton_bits.hpp" -#include "contexts.hpp" - -#include <assert.h> - -namespace proton { - -sender::sender(pn_link_t *l): link(make_wrapper(l)) {} - -void sender::open() { - attach(); -} - -void sender::open(const sender_options &opts) { - opts.apply(*this); - attach(); -} - -class source sender::source() const { - return proton::source(*this); -} - -class target sender::target() const { - return proton::target(*this); -} - -namespace { -// TODO: revisit if thread safety required -uint64_t tag_counter = 0; -} - -tracker sender::send(const message &message) { - uint64_t id = ++tag_counter; - pn_delivery_t *dlv = - pn_delivery(pn_object(), pn_dtag(reinterpret_cast<const char*>(&id), sizeof(id))); - std::vector<char> buf; - message.encode(buf); - assert(!buf.empty()); - pn_link_send(pn_object(), &buf[0], buf.size()); - pn_link_advance(pn_object()); - if (pn_link_snd_settle_mode(pn_object()) == PN_SND_SETTLED) - pn_delivery_settle(dlv); - if (!pn_link_credit(pn_object())) - link_context::get(pn_object()).draining = false; - return make_wrapper<tracker>(dlv); -} - -void sender::return_credit() { - link_context &lctx = link_context::get(pn_object()); - lctx.draining = false; - pn_link_drained(pn_object()); -} - -sender_iterator sender_iterator::operator++() { - if (!!obj_) { - pn_link_t *lnk = pn_link_next(obj_.pn_object(), 0); - while (lnk) { - if (pn_link_is_sender(lnk) && pn_link_session(lnk) == session_) - break; - lnk = pn_link_next(lnk, 0); - } - obj_ = lnk; - } - return *this; -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton-j/blob/2f85988e/proton-c/bindings/cpp/src/sender_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp deleted file mode 100644 index 4f501e6..0000000 --- a/proton-c/bindings/cpp/src/sender_options.cpp +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "proton/sender_options.hpp" -#include "proton/messaging_handler.hpp" -#include "proton/source_options.hpp" -#include "proton/target_options.hpp" - -#include "container_impl.hpp" -#include "contexts.hpp" -#include "messaging_adapter.hpp" -#include "proton_bits.hpp" - -namespace proton { - -template <class T> struct option { - T value; - bool set; - - option() : value(), set(false) {} - option& operator=(const T& x) { value = x; set = true; return *this; } - void update(const option<T>& x) { if (x.set) *this = x.value; } -}; - -class sender_options::impl { - static link_context& get_context(sender l) { - return link_context::get(unwrap(l)); - } - - static void set_delivery_mode(sender l, proton::delivery_mode mode) { - switch (mode) { - case delivery_mode::AT_MOST_ONCE: - pn_link_set_snd_settle_mode(unwrap(l), PN_SND_SETTLED); - break; - case delivery_mode::AT_LEAST_ONCE: - pn_link_set_snd_settle_mode(unwrap(l), PN_SND_UNSETTLED); - pn_link_set_rcv_settle_mode(unwrap(l), PN_RCV_FIRST); - break; - default: - break; - } - } - - public: - option<messaging_handler*> handler; - option<proton::delivery_mode> delivery_mode; - option<bool> auto_settle; - option<source_options> source; - option<target_options> target; - - void apply(sender& s) { - if (s.uninitialized()) { - if (delivery_mode.set) set_delivery_mode(s, delivery_mode.value); - if (handler.set && handler.value) container::impl::set_handler(s, handler.value); - if (auto_settle.set) get_context(s).auto_settle = auto_settle.value; - if (source.set) { - proton::source local_s(make_wrapper<proton::source>(pn_link_source(unwrap(s)))); - source.value.apply(local_s); - } - if (target.set) { - proton::target local_t(make_wrapper<proton::target>(pn_link_target(unwrap(s)))); - target.value.apply(local_t); - } - } - } - - void update(const impl& x) { - handler.update(x.handler); - delivery_mode.update(x.delivery_mode); - auto_settle.update(x.auto_settle); - source.update(x.source); - target.update(x.target); - } - -}; - -sender_options::sender_options() : impl_(new impl()) {} -sender_options::sender_options(const sender_options& x) : impl_(new impl()) { - *this = x; -} -sender_options::~sender_options() {} - -sender_options& sender_options::operator=(const sender_options& x) { - *impl_ = *x.impl_; - return *this; -} - -void sender_options::update(const sender_options& x) { impl_->update(*x.impl_); } - -sender_options& sender_options::handler(class messaging_handler &h) { impl_->handler = &h; return *this; } -sender_options& sender_options::delivery_mode(proton::delivery_mode m) {impl_->delivery_mode = m; return *this; } -sender_options& sender_options::auto_settle(bool b) {impl_->auto_settle = b; return *this; } -sender_options& sender_options::source(const source_options &s) {impl_->source = s; return *this; } -sender_options& sender_options::target(const target_options &s) {impl_->target = s; return *this; } - -void sender_options::apply(sender& s) const { impl_->apply(s); } - -} // namespace proton --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
