http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/Url.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Url.hpp b/proton-c/bindings/cpp/src/Url.hpp deleted file mode 100644 index 3c2e450..0000000 --- a/proton-c/bindings/cpp/src/Url.hpp +++ /dev/null @@ -1,49 +0,0 @@ -#ifndef PROTON_CPP_URL_H -#define PROTON_CPP_URL_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton/export.hpp" -#include "proton/ProtonHandle.hpp" -#include "proton/url.h" -#include <string> - -namespace proton { -namespace reactor { - -class Url : public ProtonHandle<pn_url_t> -{ - public: - PN_CPP_EXTERN Url(const std::string &url); - PN_CPP_EXTERN ~Url(); - PN_CPP_EXTERN Url(const Url&); - PN_CPP_EXTERN Url& operator=(const Url&); - PN_CPP_EXTERN std::string getHost(); - PN_CPP_EXTERN std::string getPort(); - PN_CPP_EXTERN std::string getPath(); - private: - friend class ProtonImplRef<Url>; -}; - - -}} // namespace proton::reactor - -#endif /*!PROTON_CPP_URL_H*/
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/Value.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Value.cpp b/proton-c/bindings/cpp/src/Value.cpp deleted file mode 100644 index cf41175..0000000 --- a/proton-c/bindings/cpp/src/Value.cpp +++ /dev/null @@ -1,136 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "proton/Value.hpp" -#include "proton_bits.hpp" -#include <proton/codec.h> -#include <ostream> -#include <algorithm> - -namespace proton { - -Value::Value() { *this = Null(); } -Value::Value(const Value& v) { *this = v; } -Value::~Value() {} - -Value& Value::operator=(const Value& v) { values = v.values; return *this; } - -TypeId Value::type() const { - const_cast<Values&>(values).rewind(); - return values.type(); -} - -namespace { -template <class T> T check(T result) { - if (result < 0) - throw EncodeError("encode: " + errorStr(result)); - return result; -} -} - -std::ostream& operator<<(std::ostream& o, const Value& v) { - return o << v.values; -} - -namespace { - -// Compare nodes, return -1 if a<b, 0 if a==b, +1 if a>b -// Forward-declare so we can use it recursively. -int compareNext(Values& a, Values& b); - -template <class T> int compare(const T& a, const T& b) { - if (a < b) return -1; - else if (a > b) return +1; - else return 0; -} - -int compareContainer(Values& a, Values& b) { - Decoder::Scope sa(a), sb(b); - // Compare described vs. not-described. - int cmp = compare(sa.isDescribed, sb.isDescribed); - if (cmp) return cmp; - // Lexical sort (including descriptor if there is one) - size_t minSize = std::min(sa.size, sb.size) + int(sa.isDescribed); - for (size_t i = 0; i < minSize; ++i) { - cmp = compareNext(a, b); - if (cmp) return cmp; - } - return compare(sa.size, sb.size); -} - -template <class T> int compareSimple(Values& a, Values& b) { - T va, vb; - a >> va; - b >> vb; - return compare(va, vb); -} - -int compareNext(Values& a, Values& b) { - // Sort by TypeId first. - TypeId ta = a.type(), tb = b.type(); - int cmp = compare(ta, tb); - if (cmp) return cmp; - - switch (ta) { - case NULL_: return 0; - case ARRAY: - case LIST: - case MAP: - case DESCRIBED: - return compareContainer(a, b); - case BOOL: return compareSimple<Bool>(a, b); - case UBYTE: return compareSimple<Ubyte>(a, b); - case BYTE: return compareSimple<Byte>(a, b); - case USHORT: return compareSimple<Ushort>(a, b); - case SHORT: return compareSimple<Short>(a, b); - case UINT: return compareSimple<Uint>(a, b); - case INT: return compareSimple<Int>(a, b); - case CHAR: return compareSimple<Char>(a, b); - case ULONG: return compareSimple<Ulong>(a, b); - case LONG: return compareSimple<Long>(a, b); - case TIMESTAMP: return compareSimple<Timestamp>(a, b); - case FLOAT: return compareSimple<Float>(a, b); - case DOUBLE: return compareSimple<Double>(a, b); - case DECIMAL32: return compareSimple<Decimal32>(a, b); - case DECIMAL64: return compareSimple<Decimal64>(a, b); - case DECIMAL128: return compareSimple<Decimal128>(a, b); - case UUID: return compareSimple<Uuid>(a, b); - case BINARY: return compareSimple<Binary>(a, b); - case STRING: return compareSimple<String>(a, b); - case SYMBOL: return compareSimple<Symbol>(a, b); - } - // Invalid but equal TypeId, treat as equal. - return 0; -} - -} // namespace - -bool Value::operator==(const Value& v) const { - values.rewind(); - v.values.rewind(); - return compareNext(values, v.values) == 0; -} - -bool Value::operator<(const Value& v) const { - values.rewind(); - v.values.rewind(); - return compareNext(values, v.values) < 0; -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/Values.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Values.cpp b/proton-c/bindings/cpp/src/Values.cpp deleted file mode 100644 index 95e5784..0000000 --- a/proton-c/bindings/cpp/src/Values.cpp +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -#include "proton/Value.hpp" -#include "proton_bits.hpp" -#include <proton/codec.h> -#include <ostream> - -namespace proton { - -Values::Values() {} -Values::Values(const Values& v) { *this = v; } -Values::Values(pn_data_t* d) : Data(d) {} - -Values::~Values() {} -Values& Values::operator=(const Values& v) { Data::operator=(v); return *this; } - -Values& Values::rewind() { pn_data_rewind(data); return *this; } - -std::ostream& operator<<(std::ostream& o, const Values& v) { - return o << static_cast<const Encoder&>(v); -} - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/acceptor.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/acceptor.cpp b/proton-c/bindings/cpp/src/acceptor.cpp new file mode 100644 index 0000000..990bfc3 --- /dev/null +++ b/proton-c/bindings/cpp/src/acceptor.cpp @@ -0,0 +1,55 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/acceptor.hpp" +#include "proton/error.hpp" +#include "proton_impl_ref.hpp" +#include "msg.hpp" + +namespace proton { + +template class proton_handle<pn_acceptor_t>; +typedef proton_impl_ref<acceptor> PI; + +acceptor::acceptor() {} + +acceptor::acceptor(pn_acceptor_t *a) +{ + PI::ctor(*this, a); +} + +acceptor::~acceptor() { PI::dtor(*this); } + + +acceptor::acceptor(const acceptor& a) : proton_handle<pn_acceptor_t>() { + PI::copy(*this, a); +} + +acceptor& acceptor::operator=(const acceptor& a) { + return PI::assign(*this, a); +} + +void acceptor::close() { + if (impl_) + pn_acceptor_close(impl_); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/acking.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/acking.cpp b/proton-c/bindings/cpp/src/acking.cpp new file mode 100644 index 0000000..5738257 --- /dev/null +++ b/proton-c/bindings/cpp/src/acking.cpp @@ -0,0 +1,48 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/acking.hpp" +#include "proton/delivery.h" + +namespace proton { + +void acking::accept(delivery &d) { + settle(d, delivery::ACCEPTED); +} + +void acking::reject(delivery &d) { + settle(d, delivery::REJECTED); +} + +void acking::release(delivery &d, bool delivered) { + if (delivered) + settle(d, delivery::MODIFIED); + else + settle(d, delivery::RELEASED); +} + +void acking::settle(delivery &d, delivery::state state) { + if (state) + pn_delivery_update(d.pn_delivery(), state); + d.settle(); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/blocking_connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection.cpp b/proton-c/bindings/cpp/src/blocking_connection.cpp new file mode 100644 index 0000000..5bd790b --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking_connection.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/container.hpp" +#include "proton/blocking_connection.hpp" +#include "proton/blocking_sender.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/error.hpp" +#include "msg.hpp" +#include "blocking_connection_impl.hpp" +#include "private_impl_ref.hpp" + +namespace proton { + +template class handle<blocking_connection_impl>; +typedef private_impl_ref<blocking_connection> PI; + +blocking_connection::blocking_connection() {PI::ctor(*this, 0); } + +blocking_connection::blocking_connection(const blocking_connection& c) : handle<blocking_connection_impl>() { PI::copy(*this, c); } + +blocking_connection& blocking_connection::operator=(const blocking_connection& c) { return PI::assign(*this, c); } +blocking_connection::~blocking_connection() { PI::dtor(*this); } + +blocking_connection::blocking_connection(std::string &url, duration d, ssl_domain *ssld, container *c) { + blocking_connection_impl *cimpl = new blocking_connection_impl(url, d,ssld, c); + PI::ctor(*this, cimpl); +} + +void blocking_connection::close() { impl_->close(); } + +void blocking_connection::wait(wait_condition &cond) { return impl_->wait(cond); } +void blocking_connection::wait(wait_condition &cond, std::string &msg, duration timeout) { + return impl_->wait(cond, msg, timeout); +} + +blocking_sender blocking_connection::create_sender(std::string &address, handler *h) { + sender sender = impl_->container_.create_sender(impl_->connection_, address, h); + return blocking_sender(*this, sender); +} + +duration blocking_connection::timeout() { return impl_->timeout(); } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/blocking_connection_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.cpp b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp new file mode 100644 index 0000000..0cc824f --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking_connection_impl.cpp @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/container.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/duration.hpp" +#include "proton/error.hpp" +#include "proton/wait_condition.hpp" +#include "blocking_connection_impl.hpp" +#include "msg.hpp" +#include "contexts.hpp" + +#include "proton/connection.h" + +namespace proton { + +wait_condition::~wait_condition() {} + + +void blocking_connection_impl::incref(blocking_connection_impl *impl_) { + impl_->refcount_++; +} + +void blocking_connection_impl::decref(blocking_connection_impl *impl_) { + impl_->refcount_--; + if (impl_->refcount_ == 0) + delete impl_; +} + +namespace { +struct connection_opening : public wait_condition { + connection_opening(pn_connection_t *c) : pn_connection(c) {} + bool achieved() { return (pn_connection_state(pn_connection) & PN_REMOTE_UNINIT); } + pn_connection_t *pn_connection; +}; + +struct connection_closed : public wait_condition { + connection_closed(pn_connection_t *c) : pn_connection(c) {} + bool achieved() { return !(pn_connection_state(pn_connection) & PN_REMOTE_ACTIVE); } + pn_connection_t *pn_connection; +}; + +} + + +blocking_connection_impl::blocking_connection_impl(std::string &u, duration timeout0, ssl_domain *ssld, container *c) + : url_(u), timeout_(timeout0), refcount_(0) +{ + if (c) + container_ = *c; + container_.start(); + container_.timeout(timeout_); + // Create connection and send the connection events here + connection_ = container_.connect(url_, static_cast<handler *>(this)); + connection_opening cond(connection_.pn_connection()); + wait(cond); +} + +blocking_connection_impl::~blocking_connection_impl() { + container_ = container(); +} + +void blocking_connection_impl::close() { + connection_.close(); + connection_closed cond(connection_.pn_connection()); + wait(cond); +} + +void blocking_connection_impl::wait(wait_condition &condition) { + std::string empty; + wait(condition, empty, timeout_); +} + +void blocking_connection_impl::wait(wait_condition &condition, std::string &msg, duration wait_timeout) { + if (wait_timeout == duration::FOREVER) { + while (!condition.achieved()) { + container_.process(); + } + } + + pn_reactor_t *reactor = container_.reactor(); + pn_millis_t orig_timeout = pn_reactor_get_timeout(reactor); + pn_reactor_set_timeout(reactor, wait_timeout.milliseconds); + try { + pn_timestamp_t now = pn_reactor_mark(reactor); + pn_timestamp_t deadline = now + wait_timeout.milliseconds; + while (!condition.achieved()) { + container_.process(); + if (deadline < pn_reactor_mark(reactor)) { + std::string txt = "connection timed out"; + if (!msg.empty()) + txt += ": " + msg; + // TODO: proper Timeout exception + throw error(MSG(txt)); + } + } + } catch (...) { + pn_reactor_set_timeout(reactor, orig_timeout); + throw; + } + pn_reactor_set_timeout(reactor, orig_timeout); +} + + + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/blocking_connection_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp new file mode 100644 index 0000000..02305ba --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp @@ -0,0 +1,62 @@ +#ifndef PROTON_CPP_CONNECTIONIMPL_H +#define PROTON_CPP_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/export.hpp" +#include "proton/endpoint.hpp" +#include "proton/container.hpp" +#include "proton/types.h" +#include <string> + +struct pn_connection_t; + +namespace proton { + +class handler; +class container; +class ssl_domain; + + class blocking_connection_impl : public messaging_handler +{ + public: + PN_CPP_EXTERN blocking_connection_impl(std::string &url, duration d, ssl_domain *ssld, container *c); + PN_CPP_EXTERN ~blocking_connection_impl(); + PN_CPP_EXTERN void close(); + PN_CPP_EXTERN void wait(wait_condition &condition); + PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, duration timeout); + PN_CPP_EXTERN pn_connection_t *pn_blocking_connection(); + duration timeout() { return timeout_; } + static void incref(blocking_connection_impl *); + static void decref(blocking_connection_impl *); + private: + friend class blocking_connection; + container container_; + connection connection_; + std::string url_; + duration timeout_; + int refcount_; +}; + + +} + +#endif /*!PROTON_CPP_CONNECTIONIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/blocking_link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_link.cpp b/proton-c/bindings/cpp/src/blocking_link.cpp new file mode 100644 index 0000000..b9f23c4 --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking_link.cpp @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/blocking_link.hpp" +#include "proton/blocking_connection.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/wait_condition.hpp" +#include "proton/error.hpp" +#include "msg.hpp" + + +namespace proton { + +namespace { +struct link_opened : public wait_condition { + link_opened(pn_link_t *l) : pn_link(l) {} + bool achieved() { return !(pn_link_state(pn_link) & PN_REMOTE_UNINIT); } + pn_link_t *pn_link; +}; + +struct link_closed : public wait_condition { + link_closed(pn_link_t *l) : pn_link(l) {} + bool achieved() { return (pn_link_state(pn_link) & PN_REMOTE_CLOSED); } + pn_link_t *pn_link; +}; + +struct link_not_open : public wait_condition { + link_not_open(pn_link_t *l) : pn_link(l) {} + bool achieved() { return !(pn_link_state(pn_link) & PN_REMOTE_ACTIVE); } + pn_link_t *pn_link; +}; + + +} // namespace + + +blocking_link::blocking_link(blocking_connection *c, pn_link_t *pnl) : connection_(*c), link_(pnl) { + std::string msg = "Opening link " + link_.name(); + link_opened link_opened(link_.pn_link()); + connection_.wait(link_opened, msg); +} + +blocking_link::~blocking_link() {} + +void blocking_link::wait_for_closed(duration timeout) { + std::string msg = "Closing link " + link_.name(); + link_closed link_closed(link_.pn_link()); + connection_.wait(link_closed, msg); + check_closed(); +} + +void blocking_link::check_closed() { + pn_link_t * pn_link = link_.pn_link(); + if (pn_link_state(pn_link) & PN_REMOTE_CLOSED) { + link_.close(); + // TODO: link_detached exception + throw error(MSG("link detached")); + } +} + +void blocking_link::close() { + link_.close(); + std::string msg = "Closing link " + link_.name(); + link_not_open link_not_open(link_.pn_link()); + connection_.wait(link_not_open, msg); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/blocking_sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking_sender.cpp b/proton-c/bindings/cpp/src/blocking_sender.cpp new file mode 100644 index 0000000..2ab1ef1 --- /dev/null +++ b/proton-c/bindings/cpp/src/blocking_sender.cpp @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/blocking_sender.hpp" +#include "proton/blocking_connection.hpp" +#include "proton/wait_condition.hpp" +#include "proton/error.hpp" +#include "msg.hpp" + + +namespace proton { + +namespace { +struct delivery_settled : public wait_condition { + delivery_settled(pn_delivery_t *d) : pn_delivery(d) {} + bool achieved() { return pn_delivery_settled(pn_delivery); } + pn_delivery_t *pn_delivery; +}; + +} // namespace + + +blocking_sender::blocking_sender(blocking_connection &c, sender &l) : blocking_link(&c, l.pn_link()) { + std::string ta = link_.target().address(); + std::string rta = link_.remote_target().address(); + if (ta.empty() || ta.compare(rta) != 0) { + wait_for_closed(); + link_.close(); + std::string txt = "Failed to open sender " + link_.name() + ", target does not match"; + throw error(MSG("container not started")); + } +} + +delivery blocking_sender::send(message &msg, duration timeout) { + sender snd = link_; + delivery dlv = snd.send(msg); + std::string txt = "Sending on sender " + link_.name(); + delivery_settled cond(dlv.pn_delivery()); + connection_.wait(cond, txt, timeout); + return dlv; +} + +delivery blocking_sender::send(message &msg) { + // Use default timeout + return send(msg, connection_.timeout()); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection.cpp b/proton-c/bindings/cpp/src/connection.cpp new file mode 100644 index 0000000..6c21fdd --- /dev/null +++ b/proton-c/bindings/cpp/src/connection.cpp @@ -0,0 +1,68 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/container.hpp" +#include "proton/connection.hpp" +#include "proton/handler.hpp" +#include "proton/error.hpp" +#include "msg.hpp" +#include "contexts.hpp" +#include "connection_impl.hpp" +#include "private_impl_ref.hpp" + +#include "proton/connection.h" + +namespace proton { + +template class handle<connection_impl>; +typedef private_impl_ref<connection> PI; + +connection::connection() {PI::ctor(*this, 0); } +connection::connection(connection_impl* p) { PI::ctor(*this, p); } +connection::connection(const connection& c) : handle<connection_impl>() { PI::copy(*this, c); } + +connection& connection::operator=(const connection& c) { return PI::assign(*this, c); } +connection::~connection() { PI::dtor(*this); } + +connection::connection(class container &c, handler *h) { + connection_impl *cimpl = new connection_impl(c, h); + PI::ctor(*this, cimpl); +} + +transport &connection::transport() { return impl_->transport(); } + +handler* connection::override() { return impl_->override(); } +void connection::override(handler *h) { impl_->override(h); } + +void connection::open() { impl_->open(); } + +void connection::close() { impl_->close(); } + +pn_connection_t *connection::pn_connection() { return impl_->pn_connection(); } + +std::string connection::hostname() { return impl_->hostname(); } + +class container &connection::container() { return impl_->container(); } + +link connection::link_head(endpoint::State mask) { + return impl_->link_head(mask); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/connection_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_impl.cpp b/proton-c/bindings/cpp/src/connection_impl.cpp new file mode 100644 index 0000000..e515d78 --- /dev/null +++ b/proton-c/bindings/cpp/src/connection_impl.cpp @@ -0,0 +1,136 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/container.hpp" +#include "proton/handler.hpp" +#include "proton/error.hpp" +#include "connection_impl.hpp" +#include "proton/transport.hpp" +#include "msg.hpp" +#include "contexts.hpp" +#include "private_impl_ref.hpp" +#include "container_impl.hpp" + +#include "proton/connection.h" + +namespace proton { + +void connection_impl::incref(connection_impl *impl_) { + impl_->refcount_++; +} + +void connection_impl::decref(connection_impl *impl_) { + impl_->refcount_--; + if (impl_->refcount_ == 0) + delete impl_; +} + +connection_impl::connection_impl(class container &c, pn_connection_t &pn_conn) + : container_(c), refcount_(0), override_(0), transport_(0), default_session_(0), + pn_connection_(&pn_conn), reactor_reference_(this) +{ + connection_context(pn_connection_, this); +} + +connection_impl::connection_impl(class container &c, handler *handler) + : container_(c), refcount_(0), override_(0), transport_(0), default_session_(0), + reactor_reference_(this) +{ + pn_handler_t *chandler = 0; + if (handler) { + container_impl *container_impl = private_impl_ref<class container>::get(c); + chandler = container_impl->wrap_handler(handler); + } + pn_connection_ = pn_reactor_connection(container_.reactor(), chandler); + if (chandler) + pn_decref(chandler); + connection_context(pn_connection_, this); +} + +connection_impl::~connection_impl() { + delete transport_; + delete override_; +} + +transport &connection_impl::transport() { + if (transport_) + return *transport_; + throw error(MSG("connection has no transport")); +} + +handler* connection_impl::override() { return override_; } +void connection_impl::override(handler *h) { + if (override_) + delete override_; + override_ = h; +} + +void connection_impl::open() { + pn_connection_open(pn_connection_); +} + +void connection_impl::close() { + pn_connection_close(pn_connection_); +} + +pn_connection_t *connection_impl::pn_connection() { return pn_connection_; } + +std::string connection_impl::hostname() { + return std::string(pn_connection_get_hostname(pn_connection_)); +} + +connection &connection_impl::connection() { + // endpoint interface. Should be implemented in the connection object. + throw error(MSG("Internal error")); +} + +container &connection_impl::container() { return (container_); } + +void connection_impl::reactor_detach() { + // "save" goes out of scope last, preventing possible recursive destructor + // confusion with reactor_reference. + class connection save(reactor_reference_); + if (reactor_reference_) + reactor_reference_ = proton::connection(); + pn_connection_ = 0; +} + +connection &connection_impl::reactor_reference(pn_connection_t *conn) { + if (!conn) + throw error(MSG("amqp_null Proton connection")); + connection_impl *impl_ = connection_context(conn); + if (!impl_) { + // First time we have seen this connection + pn_reactor_t *reactor = pn_object_reactor(conn); + if (!reactor) + throw error(MSG("Invalid Proton connection specifier")); + class container container(container_context(reactor)); + if (!container) // can't be one created by our container + throw error(MSG("Unknown Proton connection specifier")); + impl_ = new connection_impl(container, *conn); + } + return impl_->reactor_reference_; +} + +link connection_impl::link_head(endpoint::State mask) { + return link(pn_link_head(pn_connection_, mask)); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/connection_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_impl.hpp b/proton-c/bindings/cpp/src/connection_impl.hpp new file mode 100644 index 0000000..02c47b4 --- /dev/null +++ b/proton-c/bindings/cpp/src/connection_impl.hpp @@ -0,0 +1,74 @@ +#ifndef PROTON_CPP_CONNECTIONIMPL_H +#define PROTON_CPP_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/export.hpp" +#include "proton/endpoint.hpp" +#include "proton/container.hpp" +#include "proton/types.h" +#include <string> + +struct pn_connection_t; + +namespace proton { + +class handler; +class transport; +class container; + +class connection_impl : public endpoint +{ + public: + PN_CPP_EXTERN connection_impl(class container &c, pn_connection_t &pn_conn); + PN_CPP_EXTERN connection_impl(class container &c, handler *h = 0); + PN_CPP_EXTERN virtual ~connection_impl(); + PN_CPP_EXTERN class transport &transport(); + PN_CPP_EXTERN handler *override(); + PN_CPP_EXTERN void override(handler *h); + PN_CPP_EXTERN void open(); + PN_CPP_EXTERN void close(); + PN_CPP_EXTERN pn_connection_t *pn_connection(); + PN_CPP_EXTERN class container &container(); + PN_CPP_EXTERN std::string hostname(); + PN_CPP_EXTERN link link_head(endpoint::State mask); + virtual PN_CPP_EXTERN class connection &connection(); + static class connection &reactor_reference(pn_connection_t *); + static connection_impl *impl(const class connection &c) { return c.impl_; } + void reactor_detach(); + static void incref(connection_impl *); + static void decref(connection_impl *); + private: + friend class Connector; + friend class container_impl; + class container container_; + int refcount_; + handler *override_; + class transport *transport_; + pn_session_t *default_session_; // Temporary, for session_per_connection style policy. + pn_connection_t *pn_connection_; + class connection reactor_reference_; // Keep-alive reference, until PN_CONNECTION_FINAL. +}; + + +} + +#endif /*!PROTON_CPP_CONNECTIONIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/connector.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connector.cpp b/proton-c/bindings/cpp/src/connector.cpp new file mode 100644 index 0000000..2b1b935 --- /dev/null +++ b/proton-c/bindings/cpp/src/connector.cpp @@ -0,0 +1,70 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/connection.hpp" +#include "proton/transport.hpp" +#include "proton/container.hpp" +#include "proton/event.hpp" +#include "proton/connection.h" +#include "connector.hpp" +#include "connection_impl.hpp" +#include "url.hpp" + +namespace proton { + +Connector::Connector(connection &c) : connection_(c), transport_(0) {} + +Connector::~Connector() {} + +void Connector::address(const std::string &a) { + address_ = a; +} + +void Connector::connect() { + pn_connection_t *conn = connection_.pn_connection(); + pn_connection_set_container(conn, connection_.container().container_id().c_str()); + Url url(address_); + std::string hostname = url.host() + ":" + url.port(); + pn_connection_set_hostname(conn, hostname.c_str()); + transport_ = new transport(); + transport_->bind(connection_); + connection_.impl_->transport_ = transport_; +} + + +void Connector::on_connection_local_open(event &e) { + connect(); +} + +void Connector::on_connection_remote_open(event &e) {} + +void Connector::on_connection_init(event &e) { +} + +void Connector::on_transport_closed(event &e) { + // TODO: prepend with reconnect logic + pn_connection_release(connection_.impl_->pn_connection_); + // No more interaction, so drop our counted reference. + connection_ = connection(); +} + + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/connector.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connector.hpp b/proton-c/bindings/cpp/src/connector.hpp new file mode 100644 index 0000000..29e3326 --- /dev/null +++ b/proton-c/bindings/cpp/src/connector.hpp @@ -0,0 +1,58 @@ +#ifndef PROTON_CPP_CONNECTOR_HANDLER_H +#define PROTON_CPP_CONNECTOR_HANDLER_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/proton_handler.hpp" +#include "proton/event.h" +#include "proton/reactor.h" +#include <string> + + +namespace proton { + +class event; +class connection; +class transport; + +class Connector : public proton_handler +{ + public: + Connector(connection &c); + ~Connector(); + void address(const std::string &host); + void connect(); + virtual void on_connection_local_open(event &e); + virtual void on_connection_remote_open(event &e); + virtual void on_connection_init(event &e); + virtual void on_transport_closed(event &e); + + private: + connection connection_; + std::string address_; + transport *transport_; +}; + + +} + +#endif /*!PROTON_CPP_CONNECTOR_HANDLER_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/container.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container.cpp b/proton-c/bindings/cpp/src/container.cpp new file mode 100644 index 0000000..750300b --- /dev/null +++ b/proton-c/bindings/cpp/src/container.cpp @@ -0,0 +1,96 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/container.hpp" +#include "proton/messaging_event.hpp" +#include "proton/connection.hpp" +#include "proton/session.hpp" +#include "proton/messaging_adapter.hpp" +#include "proton/acceptor.hpp" +#include "proton/error.hpp" +#include "container_impl.hpp" +#include "private_impl_ref.hpp" + +#include "connector.hpp" +#include "contexts.hpp" +#include "url.hpp" + +#include "proton/connection.h" +#include "proton/session.h" + +namespace proton { + +template class handle<container_impl>; +typedef private_impl_ref<container> PI; + +container::container(container_impl* p) { PI::ctor(*this, p); } +container::container(const container& c) : handle<container_impl>() { PI::copy(*this, c); } +container& container::operator=(const container& c) { return PI::assign(*this, c); } +container::~container() { PI::dtor(*this); } + +container::container(messaging_handler &mhandler) { + container_impl *cimpl = new container_impl(mhandler); + PI::ctor(*this, cimpl); +} + +container::container() { + container_impl *cimpl = new container_impl(); + PI::ctor(*this, cimpl); +} + +connection container::connect(std::string &host, handler *h) { return impl_->connect(host, h); } + +pn_reactor_t *container::reactor() { return impl_->reactor(); } + +std::string container::container_id() { return impl_->container_id(); } + +duration container::timeout() { return impl_->timeout(); } +void container::timeout(duration timeout) { impl_->timeout(timeout); } + + +sender container::create_sender(connection &connection, std::string &addr, handler *h) { + return impl_->create_sender(connection, addr, h); +} + +sender container::create_sender(std::string &url_string) { + return impl_->create_sender(url_string); +} + +receiver container::create_receiver(connection &connection, std::string &addr) { + return impl_->create_receiver(connection, addr); +} + +receiver container::create_receiver(const std::string &url) { + return impl_->create_receiver(url); +} + +acceptor container::listen(const std::string &url_string) { + return impl_->listen(url_string); +} + + +void container::run() { impl_->run(); } +void container::start() { impl_->start(); } +bool container::process() { return impl_->process(); } +void container::stop() { impl_->stop(); } +void container::wakeup() { impl_->wakeup(); } +bool container::is_quiesced() { return impl_->is_quiesced(); } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/container_impl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.cpp b/proton-c/bindings/cpp/src/container_impl.cpp new file mode 100644 index 0000000..3ce6e25 --- /dev/null +++ b/proton-c/bindings/cpp/src/container_impl.cpp @@ -0,0 +1,361 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/container.hpp" +#include "proton/messaging_event.hpp" +#include "proton/connection.hpp" +#include "proton/session.hpp" +#include "proton/messaging_adapter.hpp" +#include "proton/acceptor.hpp" +#include "proton/error.hpp" + +#include "msg.hpp" +#include "container_impl.hpp" +#include "connection_impl.hpp" +#include "connector.hpp" +#include "contexts.hpp" +#include "url.hpp" +#include "private_impl_ref.hpp" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/handlers.h" + +namespace proton { + +namespace { + +connection_impl *impl(const connection &c) { + return private_impl_ref<connection>::get(c); +} + +} // namespace + + +class CHandler : public handler +{ + public: + CHandler(pn_handler_t *h) : pn_handler_(h) { + pn_incref(pn_handler_); + } + ~CHandler() { + pn_decref(pn_handler_); + } + pn_handler_t *pn_handler() { return pn_handler_; } + + virtual void on_unhandled(event &e) { + proton_event *pne = dynamic_cast<proton_event *>(&e); + if (!pne) return; + int type = pne->type(); + if (!type) return; // Not from the reactor + pn_handler_dispatch(pn_handler_, pne->pn_event(), (pn_event_type_t) type); + } + + private: + pn_handler_t *pn_handler_; +}; + + +// Used to sniff for Connector events before the reactor's global handler sees them. +class override_handler : public handler +{ + public: + pn_handler_t *base_handler; + + override_handler(pn_handler_t *h) : base_handler(h) { + pn_incref(base_handler); + } + ~override_handler() { + pn_decref(base_handler); + } + + + virtual void on_unhandled(event &e) { + proton_event *pne = dynamic_cast<proton_event *>(&e); + // If not a Proton reactor event, nothing to override, nothing to pass along. + if (!pne) return; + int type = pne->type(); + if (!type) return; // Also not from the reactor + + pn_event_t *cevent = pne->pn_event(); + pn_connection_t *conn = pn_event_connection(cevent); + if (conn && type != PN_CONNECTION_INIT) { + // send to override handler first + connection_impl *connection = connection_context(conn); + if (connection) { + handler *override = connection->override(); + if (override) { + e.dispatch(*override); + } + } + } + + pn_handler_dispatch(base_handler, cevent, (pn_event_type_t) type); + + if (conn && type == PN_CONNECTION_FINAL) { + // TODO: this must be the last action of the last handler looking at + // connection events. Better: generate a custom FINAL event (or task). Or move to + // separate event streams per connection as part of multi threading support. + connection_impl *cimpl = connection_context(conn); + if (cimpl) + cimpl->reactor_detach(); + // TODO: remember all connections and do reactor_detach of zombie connections + // not yet pn_connection_release'd at PN_REACTOR_FINAL. + } + } +}; + + +namespace { + +// TODO: configurable policy. session_per_connection for now. +session default_session(pn_connection_t *conn, pn_session_t **ses) { + if (!*ses) { + *ses = pn_session(conn); + pn_session_open(*ses); + } + return session(*ses); +} + +struct inbound_context { + static inbound_context* get(pn_handler_t* h) { + return reinterpret_cast<inbound_context*>(pn_handler_mem(h)); + } + container_impl *container_impl_; + handler *cpp_handler_; +}; + +void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_type_t type) +{ + // Ref counted per event, but when is the last event if stop() never called? + container c(inbound_context::get(c_handler)->container_impl_); + messaging_event mevent(cevent, type, c); + mevent.dispatch(*inbound_context::get(c_handler)->cpp_handler_); +} + +void cpp_handler_cleanup(pn_handler_t *c_handler) +{ +} + +pn_handler_t *cpp_handler(container_impl *c, handler *h) +{ + pn_handler_t *handler = pn_handler_new(cpp_handler_dispatch, sizeof(struct inbound_context), cpp_handler_cleanup); + inbound_context *ctxt = inbound_context::get(handler); + ctxt->container_impl_ = c; + ctxt->cpp_handler_ = h; + return handler; +} + + +} // namespace + + +void container_impl::incref(container_impl *impl_) { + impl_->refcount_++; +} + +void container_impl::decref(container_impl *impl_) { + impl_->refcount_--; + if (impl_->refcount_ == 0) + delete impl_; +} + +container_impl::container_impl(handler &h) : + reactor_(0), handler_(&h), messaging_adapter_(0), + override_handler_(0), flow_controller_(0), container_id_(), + refcount_(0) +{} + +container_impl::container_impl() : + reactor_(0), handler_(0), messaging_adapter_(0), + override_handler_(0), flow_controller_(0), container_id_(), + refcount_(0) +{} + +container_impl::~container_impl() { + delete override_handler_; + delete flow_controller_; + delete messaging_adapter_; + pn_reactor_free(reactor_); +} + +connection container_impl::connect(std::string &host, handler *h) { + if (!reactor_) throw error(MSG("container not started")); + container cntnr(this); + connection connection(cntnr, handler_); + Connector *connector = new Connector(connection); + // Connector self-deletes depending on reconnect logic + connector->address(host); // TODO: url vector + connection.override(connector); + connection.open(); + return connection; +} + +pn_reactor_t *container_impl::reactor() { return reactor_; } + + +std::string container_impl::container_id() { return container_id_; } + +duration container_impl::timeout() { + pn_millis_t tmo = pn_reactor_get_timeout(reactor_); + if (tmo == PN_MILLIS_MAX) + return duration::FOREVER; + return duration(tmo); +} + +void container_impl::timeout(duration timeout) { + if (timeout == duration::FOREVER || timeout.milliseconds > PN_MILLIS_MAX) + pn_reactor_set_timeout(reactor_, PN_MILLIS_MAX); + else { + pn_millis_t tmo = timeout.milliseconds; + pn_reactor_set_timeout(reactor_, tmo); + } +} + + +sender container_impl::create_sender(connection &connection, std::string &addr, handler *h) { + if (!reactor_) throw error(MSG("container not started")); + session session = default_session(connection.pn_connection(), &impl(connection)->default_session_); + sender snd = session.create_sender(container_id_ + '-' + addr); + pn_link_t *lnk = snd.pn_link(); + pn_terminus_set_address(pn_link_target(lnk), addr.c_str()); + if (h) { + pn_record_t *record = pn_link_attachments(lnk); + pn_record_set_handler(record, wrap_handler(h)); + } + snd.open(); + return snd; +} + +sender container_impl::create_sender(std::string &url_string) { + if (!reactor_) throw error(MSG("container not started")); + connection conn = connect(url_string, 0); + session session = default_session(conn.pn_connection(), &impl(conn)->default_session_); + std::string path = Url(url_string).path(); + sender snd = session.create_sender(container_id_ + '-' + path); + pn_terminus_set_address(pn_link_target(snd.pn_link()), path.c_str()); + snd.open(); + return snd; +} + +receiver container_impl::create_receiver(connection &connection, std::string &addr) { + if (!reactor_) throw error(MSG("container not started")); + connection_impl *conn_impl = impl(connection); + session session = default_session(conn_impl->pn_connection_, &conn_impl->default_session_); + receiver rcv = session.create_receiver(container_id_ + '-' + addr); + pn_terminus_set_address(pn_link_source(rcv.pn_link()), addr.c_str()); + rcv.open(); + return rcv; +} + +receiver container_impl::create_receiver(const std::string &url_string) { + if (!reactor_) throw error(MSG("container not started")); + // TODO: const cleanup of API + connection conn = connect(const_cast<std::string &>(url_string), 0); + session session = default_session(conn.pn_connection(), &impl(conn)->default_session_); + std::string path = Url(url_string).path(); + receiver rcv = session.create_receiver(container_id_ + '-' + path); + pn_terminus_set_address(pn_link_source(rcv.pn_link()), path.c_str()); + rcv.open(); + return rcv; +} + +class acceptor container_impl::acceptor(const std::string &host, const std::string &port) { + pn_acceptor_t *acptr = pn_reactor_acceptor(reactor_, host.c_str(), port.c_str(), NULL); + if (acptr) + return proton::acceptor(acptr); + else + throw error(MSG("accept fail: " << pn_error_text(pn_io_error(pn_reactor_io(reactor_))) << "(" << host << ":" << port << ")")); +} + +acceptor container_impl::listen(const std::string &url_string) { + if (!reactor_) throw error(MSG("container not started")); + Url url(url_string); + // TODO: SSL + return acceptor(url.host(), url.port()); +} + + +pn_handler_t *container_impl::wrap_handler(handler *h) { + return cpp_handler(this, h); +} + + +void container_impl::initialize_reactor() { + if (reactor_) throw error(MSG("container already running")); + reactor_ = pn_reactor(); + + // Set our context on the reactor + container_context(reactor_, this); + + if (handler_) { + pn_handler_t *pn_handler = cpp_handler(this, handler_); + pn_reactor_set_handler(reactor_, pn_handler); + pn_decref(pn_handler); + } + + // Set our own global handler that "subclasses" the existing one + pn_handler_t *global_handler = pn_reactor_get_global_handler(reactor_); + override_handler_ = new override_handler(global_handler); + pn_handler_t *cpp_global_handler = cpp_handler(this, override_handler_); + pn_reactor_set_global_handler(reactor_, cpp_global_handler); + pn_decref(cpp_global_handler); + + // Note: we have just set up the following 4/5 handlers that see events in this order: + // messaging_handler (Proton C events), pn_flowcontroller (optional), messaging_adapter, + // messaging_handler (Messaging events from the messaging_adapter, i.e. the delegate), + // connector override, the reactor's default globalhandler (pn_iohandler) +} + +void container_impl::run() { + initialize_reactor(); + pn_reactor_run(reactor_); +} + +void container_impl::start() { + initialize_reactor(); + pn_reactor_start(reactor_); +} + +bool container_impl::process() { + if (!reactor_) throw error(MSG("container not started")); + bool result = pn_reactor_process(reactor_); + // TODO: check errors + return result; +} + +void container_impl::stop() { + if (!reactor_) throw error(MSG("container not started")); + pn_reactor_stop(reactor_); + // TODO: check errors +} + +void container_impl::wakeup() { + if (!reactor_) throw error(MSG("container not started")); + pn_reactor_wakeup(reactor_); + // TODO: check errors +} + +bool container_impl::is_quiesced() { + if (!reactor_) throw error(MSG("container not started")); + return pn_reactor_quiesced(reactor_); +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/container_impl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/container_impl.hpp b/proton-c/bindings/cpp/src/container_impl.hpp new file mode 100644 index 0000000..4507ab6 --- /dev/null +++ b/proton-c/bindings/cpp/src/container_impl.hpp @@ -0,0 +1,81 @@ +#ifndef PROTON_CPP_CONTAINERIMPL_H +#define PROTON_CPP_CONTAINERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/export.hpp" +#include "proton/messaging_handler.hpp" +#include "proton/connection.hpp" +#include "proton/link.hpp" +#include "proton/duration.hpp" + +#include "proton/reactor.h" + +#include <string> +namespace proton { + +class dispatch_helper; +class connection; +class connector; +class acceptor; + +class container_impl +{ + public: + PN_CPP_EXTERN container_impl(handler &h); + PN_CPP_EXTERN container_impl(); + PN_CPP_EXTERN ~container_impl(); + PN_CPP_EXTERN connection connect(std::string &host, handler *h); + PN_CPP_EXTERN void run(); + PN_CPP_EXTERN pn_reactor_t *reactor(); + PN_CPP_EXTERN sender create_sender(connection &connection, std::string &addr, handler *h); + PN_CPP_EXTERN sender create_sender(std::string &url); + PN_CPP_EXTERN receiver create_receiver(connection &connection, std::string &addr); + PN_CPP_EXTERN receiver create_receiver(const std::string &url); + PN_CPP_EXTERN class acceptor listen(const std::string &url); + PN_CPP_EXTERN std::string container_id(); + PN_CPP_EXTERN duration timeout(); + PN_CPP_EXTERN void timeout(duration timeout); + void start(); + bool process(); + void stop(); + void wakeup(); + bool is_quiesced(); + pn_handler_t *wrap_handler(handler *h); + static void incref(container_impl *); + static void decref(container_impl *); + private: + void dispatch(pn_event_t *event, pn_event_type_t type); + class acceptor acceptor(const std::string &host, const std::string &port); + void initialize_reactor(); + pn_reactor_t *reactor_; + handler *handler_; + messaging_adapter *messaging_adapter_; + handler *override_handler_; + handler *flow_controller_; + std::string container_id_; + int refcount_; +}; + + +} + +#endif /*!PROTON_CPP_CONTAINERIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/contexts.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp index fd2f3e1..98c502b 100644 --- a/proton-c/bindings/cpp/src/contexts.cpp +++ b/proton-c/bindings/cpp/src/contexts.cpp @@ -20,8 +20,8 @@ */ #include "contexts.hpp" -#include "proton/Error.hpp" -#include "Msg.hpp" +#include "proton/error.hpp" +#include "msg.hpp" #include "proton/object.h" #include "proton/message.h" #include "proton/session.h" @@ -32,44 +32,43 @@ PN_HANDLE(PNI_CPP_CONTAINER_CONTEXT) PN_HANDLE(PNI_CPP_EVENT_CONTEXT) namespace proton { -namespace reactor { -void setConnectionContext(pn_connection_t *pnConnection, ConnectionImpl *connection) { - pn_record_t *record = pn_connection_attachments(pnConnection); +void connection_context(pn_connection_t *pn_connection, connection_impl *connection) { + pn_record_t *record = pn_connection_attachments(pn_connection); pn_record_def(record, PNI_CPP_CONNECTION_CONTEXT, PN_VOID); pn_record_set(record, PNI_CPP_CONNECTION_CONTEXT, connection); } -ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection) { - if (!pnConnection) return NULL; - pn_record_t *record = pn_connection_attachments(pnConnection); - ConnectionImpl *p = (ConnectionImpl *) pn_record_get(record, PNI_CPP_CONNECTION_CONTEXT); +connection_impl *connection_context(pn_connection_t *pn_connection) { + if (!pn_connection) return NULL; + pn_record_t *record = pn_connection_attachments(pn_connection); + connection_impl *p = (connection_impl *) pn_record_get(record, PNI_CPP_CONNECTION_CONTEXT); return p; } -void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container) { - pn_record_t *record = pn_reactor_attachments(pnReactor); +void container_context(pn_reactor_t *pn_reactor, container_impl *container) { + pn_record_t *record = pn_reactor_attachments(pn_reactor); pn_record_def(record, PNI_CPP_CONTAINER_CONTEXT, PN_VOID); pn_record_set(record, PNI_CPP_CONTAINER_CONTEXT, container); } -ContainerImpl *getContainerContext(pn_reactor_t *pnReactor) { - pn_record_t *record = pn_reactor_attachments(pnReactor); - ContainerImpl *p = (ContainerImpl *) pn_record_get(record, PNI_CPP_CONTAINER_CONTEXT); - if (!p) throw Error(MSG("Reactor has no C++ container context")); +container_impl *container_context(pn_reactor_t *pn_reactor) { + pn_record_t *record = pn_reactor_attachments(pn_reactor); + container_impl *p = (container_impl *) pn_record_get(record, PNI_CPP_CONTAINER_CONTEXT); + if (!p) throw error(MSG("Reactor has no C++ container context")); return p; } -void setEventContext(pn_event_t *pnEvent, pn_message_t *m) { - pn_record_t *record = pn_event_attachments(pnEvent); +void event_context(pn_event_t *pn_event, pn_message_t *m) { + pn_record_t *record = pn_event_attachments(pn_event); pn_record_def(record, PNI_CPP_EVENT_CONTEXT, PN_OBJECT); // refcount it for life of the event pn_record_set(record, PNI_CPP_EVENT_CONTEXT, m); } -pn_message_t *getEventContext(pn_event_t *pnEvent) { - if (!pnEvent) return NULL; - pn_record_t *record = pn_event_attachments(pnEvent); +pn_message_t *event_context(pn_event_t *pn_event) { + if (!pn_event) return NULL; + pn_record_t *record = pn_event_attachments(pn_event); pn_message_t *p = (pn_message_t *) pn_record_get(record, PNI_CPP_EVENT_CONTEXT); return p; } -}} // namespace proton::reactor +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp index e1b5f24..4830044 100644 --- a/proton-c/bindings/cpp/src/contexts.hpp +++ b/proton-c/bindings/cpp/src/contexts.hpp @@ -26,27 +26,26 @@ #include "proton/message.h" namespace proton { -namespace reactor { -class ConnectionImpl; -void setConnectionContext(pn_connection_t *pnConnection, ConnectionImpl *connection); -ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection); +class connection_impl; +void connection_context(pn_connection_t *pn_connection, connection_impl *connection); +connection_impl *connection_context(pn_connection_t *pn_connection); -class Session; -void setSessionContext(pn_session_t *pnSession, Session *session); -Session *getSessionContext(pn_session_t *pnSession); +class session; +void session_context(pn_session_t *pn_session, session *session); +session *session_context(pn_session_t *pn_session); -class Link; -void setLinkContext(pn_link_t *pnLink, Link *link); -Link *getLinkContext(pn_link_t *pnLink); +class link; +void link_context(pn_link_t *pn_link, link *link); +link *link_context(pn_link_t *pn_link); -class ContainerImpl; -void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container); -ContainerImpl *getContainerContext(pn_reactor_t *pnReactor); +class container_impl; +void container_context(pn_reactor_t *pn_reactor, container_impl *container); +container_impl *container_context(pn_reactor_t *pn_reactor); -void setEventContext(pn_event_t *pnEvent, pn_message_t *m); -pn_message_t *getEventContext(pn_event_t *pnEvent); +void event_context(pn_event_t *pn_event, pn_message_t *m); +pn_message_t *event_context(pn_event_t *pn_event); -}} // namespace proton::reactor +} #endif /*!PROTON_CPP_CONTEXTS_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/data.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/data.cpp b/proton-c/bindings/cpp/src/data.cpp new file mode 100644 index 0000000..51a9dde --- /dev/null +++ b/proton-c/bindings/cpp/src/data.cpp @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proton/data.hpp" +#include "proton/codec.h" +#include "proton_bits.hpp" +#include <utility> + +namespace proton { + +data::data() : data_(::pn_data(0)), own_(true) {} + +data::data(pn_data_t* p) : data_(p), own_(false) { } + +data::data(const data& x) : data_(::pn_data(0)), own_(true) { *this = x; } + +data::~data() { if (own_ && data_) ::pn_data_free(data_); } + +void data::view(pn_data_t* new_data) { + if (data_ && own_) pn_data_free(data_); + data_ = new_data; + own_ = false; +} + +void data::swap(data& x) { + std::swap(data_, x.data_); + std::swap(own_, x.own_); +} + +data& data::operator=(const data& x) { + if (this != &x) { + if (!own_) { + data_ = ::pn_data(::pn_data_size(x.data_)); + own_ = true; + } else { + clear(); + } + ::pn_data_copy(data_, x.data_); + } + return *this; +} + +void data::clear() { ::pn_data_clear(data_); } + +void data::rewind() { ::pn_data_rewind(data_); } + +bool data::empty() const { return ::pn_data_size(data_) == 0; } + +std::ostream& operator<<(std::ostream& o, const data& d) { return o << pn_object(d.data_); } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/decoder.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/decoder.cpp b/proton-c/bindings/cpp/src/decoder.cpp new file mode 100644 index 0000000..d1f17cf --- /dev/null +++ b/proton-c/bindings/cpp/src/decoder.cpp @@ -0,0 +1,329 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proton/decoder.hpp" +#include "proton/value.hpp" +#include <proton/codec.h> +#include "proton_bits.hpp" +#include "msg.hpp" + +namespace proton { + +/**@file + * + * Note the pn_data_t "current" node is always pointing *before* the next value + * to be returned by the decoder. + * + */ +decoder::decoder() {} +decoder::decoder(const char* buffer, size_t size) { decode(buffer, size); } +decoder::decoder(const std::string& buffer) { decode(buffer); } +decoder::~decoder() {} + +static const std::string prefix("decode: "); +decode_error::decode_error(const std::string& msg) throw() : error(prefix+msg) {} + +namespace { +struct save_state { + pn_data_t* data; + pn_handle_t handle; + save_state(pn_data_t* d) : data(d), handle(pn_data_point(d)) {} + ~save_state() { if (data) pn_data_restore(data, handle); } + void cancel() { data = 0; } +}; + +struct Narrow { + pn_data_t* data; + Narrow(pn_data_t* d) : data(d) { pn_data_narrow(d); } + ~Narrow() { pn_data_widen(data); } +}; + +template <class T> T check(T result) { + if (result < 0) + throw decode_error("" + error_str(result)); + return result; +} + +} + +void decoder::decode(const char* i, size_t size) { + save_state ss(data_); + const char* end = i + size; + while (i < end) { + i += check(pn_data_decode(data_, i, end - i)); + } +} + +void decoder::decode(const std::string& buffer) { + decode(buffer.data(), buffer.size()); +} + +bool decoder::more() const { + save_state ss(data_); + return pn_data_next(data_); +} + +namespace { + +void bad_type(type_id want, type_id got) { + if (want != got) + throw decode_error("expected "+type_name(want)+" found "+type_name(got)); +} + +type_id pre_get(pn_data_t* data) { + if (!pn_data_next(data)) throw decode_error("no more data"); + type_id t = type_id(pn_data_type(data)); + if (t < 0) throw decode_error("invalid data"); + return t; +} + +// Simple extract with no type conversion. +template <class T, class U> void extract(pn_data_t* data, T& value, U (*get)(pn_data_t*)) { + save_state ss(data); + bad_type(type_idOf<T>::value, pre_get(data)); + value = get(data); + ss.cancel(); // No error, no rewind +} + +} + +void decoder::check_type(type_id want) { + type_id got = type(); + if (want != got) bad_type(want, got); +} + +type_id decoder::type() const { + save_state ss(data_); + return pre_get(data_); +} + +decoder& operator>>(decoder& d, start& s) { + save_state ss(d.data_); + s.type = pre_get(d.data_); + switch (s.type) { + case ARRAY: + s.size = pn_data_get_array(d.data_); + s.element = type_id(pn_data_get_array_type(d.data_)); + s.is_described = pn_data_is_array_described(d.data_); + break; + case LIST: + s.size = pn_data_get_list(d.data_); + break; + case MAP: + s.size = pn_data_get_map(d.data_); + break; + case DESCRIBED: + s.is_described = true; + s.size = 1; + break; + default: + throw decode_error(MSG("" << s.type << " is not a container type")); + } + pn_data_enter(d.data_); + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, finish) { pn_data_exit(d.data_); return d; } + +decoder& operator>>(decoder& d, skip) { pn_data_next(d.data_); return d; } + +decoder& operator>>(decoder& d, rewind) { d.rewind(); return d; } + +decoder& operator>>(decoder& d, value& v) { + if (d.data_ == v.values_.data_) throw decode_error("extract into self"); + pn_data_clear(v.values_.data_); + { + Narrow n(d.data_); + check(pn_data_appendn(v.values_.data_, d.data_, 1)); + } + if (!pn_data_next(d.data_)) throw decode_error("no more data"); + return d; +} + + +decoder& operator>>(decoder& d, amqp_null) { + save_state ss(d.data_); + bad_type(NULl_, pre_get(d.data_)); + return d; +} + +decoder& operator>>(decoder& d, amqp_bool& value) { + extract(d.data_, value, pn_data_get_bool); + return d; +} + +decoder& operator>>(decoder& d, amqp_ubyte& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case UBYTE: value = pn_data_get_ubyte(d.data_); break; + default: bad_type(UBYTE, type_id(type_id(pn_data_type(d.data_)))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_byte& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case BYTE: value = pn_data_get_byte(d.data_); break; + default: bad_type(BYTE, type_id(type_id(pn_data_type(d.data_)))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_ushort& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case UBYTE: value = pn_data_get_ubyte(d.data_); break; + case USHORT: value = pn_data_get_ushort(d.data_); break; + default: bad_type(USHORT, type_id(type_id(pn_data_type(d.data_)))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_short& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case BYTE: value = pn_data_get_byte(d.data_); break; + case SHORT: value = pn_data_get_short(d.data_); break; + default: bad_type(SHORT, type_id(pn_data_type(d.data_))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_uint& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case UBYTE: value = pn_data_get_ubyte(d.data_); break; + case USHORT: value = pn_data_get_ushort(d.data_); break; + case UINT: value = pn_data_get_uint(d.data_); break; + default: bad_type(UINT, type_id(pn_data_type(d.data_))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_int& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case BYTE: value = pn_data_get_byte(d.data_); break; + case SHORT: value = pn_data_get_short(d.data_); break; + case INT: value = pn_data_get_int(d.data_); break; + default: bad_type(INT, type_id(pn_data_type(d.data_))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_ulong& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case UBYTE: value = pn_data_get_ubyte(d.data_); break; + case USHORT: value = pn_data_get_ushort(d.data_); break; + case UINT: value = pn_data_get_uint(d.data_); break; + case ULONG: value = pn_data_get_ulong(d.data_); break; + default: bad_type(ULONG, type_id(pn_data_type(d.data_))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_long& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case BYTE: value = pn_data_get_byte(d.data_); break; + case SHORT: value = pn_data_get_short(d.data_); break; + case INT: value = pn_data_get_int(d.data_); break; + case LONG: value = pn_data_get_long(d.data_); break; + default: bad_type(LONG, type_id(pn_data_type(d.data_))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_char& value) { + extract(d.data_, value, pn_data_get_char); + return d; +} + +decoder& operator>>(decoder& d, amqp_timestamp& value) { + extract(d.data_, value, pn_data_get_timestamp); + return d; +} + +decoder& operator>>(decoder& d, amqp_float& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case FLOAT: value = pn_data_get_float(d.data_); break; + case DOUBLE: value = pn_data_get_double(d.data_); break; + default: bad_type(FLOAT, type_id(pn_data_type(d.data_))); + } + ss.cancel(); + return d; +} + +decoder& operator>>(decoder& d, amqp_double& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case FLOAT: value = pn_data_get_float(d.data_); break; + case DOUBLE: value = pn_data_get_double(d.data_); break; + default: bad_type(DOUBLE, type_id(pn_data_type(d.data_))); + } + ss.cancel(); + return d; +} + +// TODO aconway 2015-06-11: decimal conversions. +decoder& operator>>(decoder& d, amqp_decimal32& value) { + extract(d.data_, value, pn_data_get_decimal32); + return d; +} + +decoder& operator>>(decoder& d, amqp_decimal64& value) { + extract(d.data_, value, pn_data_get_decimal64); + return d; +} + +decoder& operator>>(decoder& d, amqp_decimal128& value) { + extract(d.data_, value, pn_data_get_decimal128); + return d; +} + +decoder& operator>>(decoder& d, amqp_uuid& value) { + extract(d.data_, value, pn_data_get_uuid); + return d; +} + +decoder& operator>>(decoder& d, std::string& value) { + save_state ss(d.data_); + switch (pre_get(d.data_)) { + case STRING: value = str(pn_data_get_string(d.data_)); break; + case BINARY: value = str(pn_data_get_binary(d.data_)); break; + case SYMBOL: value = str(pn_data_get_symbol(d.data_)); break; + default: bad_type(STRING, type_id(pn_data_type(d.data_))); + } + ss.cancel(); + return d; +} + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/delivery.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/delivery.cpp b/proton-c/bindings/cpp/src/delivery.cpp new file mode 100644 index 0000000..7ff596f --- /dev/null +++ b/proton-c/bindings/cpp/src/delivery.cpp @@ -0,0 +1,57 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/delivery.hpp" +#include "proton/delivery.h" +#include "proton_impl_ref.hpp" + +namespace proton { + +template class proton_handle<pn_delivery_t>; +typedef proton_impl_ref<delivery> PI; + +delivery::delivery(pn_delivery_t *p) { + PI::ctor(*this, p); +} +delivery::delivery() { + PI::ctor(*this, 0); +} +delivery::delivery(const delivery& c) : proton_handle<pn_delivery_t>() { + PI::copy(*this, c); +} +delivery& delivery::operator=(const delivery& c) { + return PI::assign(*this, c); +} +delivery::~delivery() { + PI::dtor(*this); +} + +bool delivery::settled() { + return pn_delivery_settled(impl_); +} + +void delivery::settle() { + pn_delivery_settle(impl_); +} + +pn_delivery_t *delivery::pn_delivery() { return impl_; } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/duration.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/duration.cpp b/proton-c/bindings/cpp/src/duration.cpp new file mode 100644 index 0000000..137fb5b --- /dev/null +++ b/proton-c/bindings/cpp/src/duration.cpp @@ -0,0 +1,31 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/duration.hpp" +#include <limits> + +namespace proton { + +const duration duration::FOREVER(std::numeric_limits<std::uint64_t>::max()); +const duration duration::IMMEDIATE(0); +const duration duration::SECOND(1000); +const duration duration::MINUTE(SECOND * 60); + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/69783099/proton-c/bindings/cpp/src/encoder.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/encoder.cpp b/proton-c/bindings/cpp/src/encoder.cpp new file mode 100644 index 0000000..9c32f58 --- /dev/null +++ b/proton-c/bindings/cpp/src/encoder.cpp @@ -0,0 +1,142 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +#include "proton/encoder.hpp" +#include "proton/value.hpp" +#include <proton/codec.h> +#include "proton_bits.hpp" +#include "msg.hpp" + +namespace proton { + +encoder::encoder() {} +encoder::~encoder() {} + +static const std::string prefix("encode: "); +encode_error::encode_error(const std::string& msg) throw() : error(prefix+msg) {} + +namespace { +struct save_state { + pn_data_t* data; + pn_handle_t handle; + save_state(pn_data_t* d) : data(d), handle(pn_data_point(d)) {} + ~save_state() { if (data) pn_data_restore(data, handle); } + void cancel() { data = 0; } +}; + +void check(int result, pn_data_t* data) { + if (result < 0) + throw encode_error(error_str(pn_data_error(data), result)); +} +} + +bool encoder::encode(char* buffer, size_t& size) { + save_state ss(data_); // In case of error + ssize_t result = pn_data_encode(data_, buffer, size); + if (result == PN_OVERFLOW) { + result = pn_data_encoded_size(data_); + if (result >= 0) { + size = result; + return false; + } + } + check(result, data_); + size = result; + ss.cancel(); // Don't restore state, all is well. + pn_data_clear(data_); + return true; +} + +void encoder::encode(std::string& s) { + size_t size = s.size(); + if (!encode(&s[0], size)) { + s.resize(size); + encode(&s[0], size); + } +} + +std::string encoder::encode() { + std::string s; + encode(s); + return s; +} + +encoder& operator<<(encoder& e, const start& s) { + switch (s.type) { + case ARRAY: pn_data_put_array(e.data_, s.is_described, pn_type_t(s.element)); break; + case MAP: pn_data_put_map(e.data_); break; + case LIST: pn_data_put_list(e.data_); break; + case DESCRIBED: pn_data_put_described(e.data_); break; + default: + throw encode_error(MSG("" << s.type << " is not a container type")); + } + pn_data_enter(e.data_); + return e; +} + +encoder& operator<<(encoder& e, finish) { + pn_data_exit(e.data_); + return e; +} + +namespace { +template <class T, class U> +encoder& insert(encoder& e, pn_data_t* data, T& value, int (*put)(pn_data_t*, U)) { + save_state ss(data); // Save state in case of error. + check(put(data, value), data); + ss.cancel(); // Don't restore state, all is good. + return e; +} +} + +encoder& operator<<(encoder& e, amqp_null) { pn_data_put_null(e.data_); return e; } +encoder& operator<<(encoder& e, amqp_bool value) { return insert(e, e.data_, value, pn_data_put_bool); } +encoder& operator<<(encoder& e, amqp_ubyte value) { return insert(e, e.data_, value, pn_data_put_ubyte); } +encoder& operator<<(encoder& e, amqp_byte value) { return insert(e, e.data_, value, pn_data_put_byte); } +encoder& operator<<(encoder& e, amqp_ushort value) { return insert(e, e.data_, value, pn_data_put_ushort); } +encoder& operator<<(encoder& e, amqp_short value) { return insert(e, e.data_, value, pn_data_put_short); } +encoder& operator<<(encoder& e, amqp_uint value) { return insert(e, e.data_, value, pn_data_put_uint); } +encoder& operator<<(encoder& e, amqp_int value) { return insert(e, e.data_, value, pn_data_put_int); } +encoder& operator<<(encoder& e, amqp_char value) { return insert(e, e.data_, value, pn_data_put_char); } +encoder& operator<<(encoder& e, amqp_ulong value) { return insert(e, e.data_, value, pn_data_put_ulong); } +encoder& operator<<(encoder& e, amqp_long value) { return insert(e, e.data_, value, pn_data_put_long); } +encoder& operator<<(encoder& e, amqp_timestamp value) { return insert(e, e.data_, value, pn_data_put_timestamp); } +encoder& operator<<(encoder& e, amqp_float value) { return insert(e, e.data_, value, pn_data_put_float); } +encoder& operator<<(encoder& e, amqp_double value) { return insert(e, e.data_, value, pn_data_put_double); } +encoder& operator<<(encoder& e, amqp_decimal32 value) { return insert(e, e.data_, value, pn_data_put_decimal32); } +encoder& operator<<(encoder& e, amqp_decimal64 value) { return insert(e, e.data_, value, pn_data_put_decimal64); } +encoder& operator<<(encoder& e, amqp_decimal128 value) { return insert(e, e.data_, value, pn_data_put_decimal128); } +encoder& operator<<(encoder& e, amqp_uuid value) { return insert(e, e.data_, value, pn_data_put_uuid); } +encoder& operator<<(encoder& e, amqp_string value) { return insert(e, e.data_, value, pn_data_put_string); } +encoder& operator<<(encoder& e, amqp_symbol value) { return insert(e, e.data_, value, pn_data_put_symbol); } +encoder& operator<<(encoder& e, amqp_binary value) { return insert(e, e.data_, value, pn_data_put_binary); } + +encoder& operator<<(encoder& e, const value& v) { + if (e.data_ == v.values_.data_) throw encode_error("cannot insert into self"); + check(pn_data_appendn(e.data_, v.values_.data_, 1), e.data_); + return e; +} + +encoder& operator<<(encoder& e, const values& v) { + if (e.data_ == v.data_) throw encode_error("cannot insert into self"); + check(pn_data_append(e.data_, v.data_), e.data_); + return e; +} + +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
