http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/include/proton/types.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/types.hpp b/proton-c/bindings/cpp/include/proton/types.hpp index 62b2f53..d3cdd27 100644 --- a/proton-c/bindings/cpp/include/proton/types.hpp +++ b/proton-c/bindings/cpp/include/proton/types.hpp @@ -19,14 +19,33 @@ * under the License. */ +#include "proton/export.hpp" #include <proton/codec.h> -#include "proton/ImportExport.hpp" #include <algorithm> #include <bitset> #include <string> -#include <stdint.h> #include <memory.h> +// Workaround for older C++ compilers +#if defined(__cplusplus) && __cplusplus >= 201100 +#include <cstdint> + +#else // Workaround for older C++ compilers + +#include <proton/type_compat.h> +namespace std { +// Exact-size integer types. +using ::int8_t; +using ::int16_t; +using ::int32_t; +using ::int64_t; +using ::uint8_t; +using ::uint16_t; +using ::uint32_t; +using ::uint64_t; +} +#endif + /**@file * C++ types representing AMQP types. * @ingroup cpp @@ -34,7 +53,6 @@ namespace proton { - /** TypeId identifies an AMQP type */ enum TypeId { NULL_=PN_NULL, ///< The null type, contains no data. @@ -84,26 +102,26 @@ template<class T> bool operator!=(const Comparable<T>& a, const Comparable<T>& b */ struct Null {}; typedef bool Bool; -typedef uint8_t Ubyte; -typedef int8_t Byte; -typedef uint16_t Ushort; -typedef int16_t Short; -typedef uint32_t Uint; -typedef int32_t Int; +typedef std::uint8_t Ubyte; +typedef std::int8_t Byte; +typedef std::uint16_t Ushort; +typedef std::int16_t Short; +typedef std::uint32_t Uint; +typedef std::int32_t Int; typedef wchar_t Char; -typedef uint64_t Ulong; -typedef int64_t Long; +typedef std::uint64_t Ulong; +typedef std::int64_t Long; typedef float Float; typedef double Double; ///@internal -pn_bytes_t pn_bytes(const std::string&); +PN_CPP_EXTERN pn_bytes_t pn_bytes(const std::string&); //@internal -std::string str(const pn_bytes_t& b); +PN_CPP_EXTERN std::string str(const pn_bytes_t& b); ///@internal #define STRING_LIKE(NAME) \ - PN_CPP_EXTERN struct NAME : public std::string{ \ + struct NAME : public std::string{ \ NAME(const std::string& s=std::string()) : std::string(s) {} \ NAME(const char* s) : std::string(s) {} \ NAME(const pn_bytes_t& b) : std::string(b.start, b.size) {} \ @@ -117,36 +135,53 @@ STRING_LIKE(Symbol); /** Binary data */ STRING_LIKE(Binary); -///@internal -pn_uuid_t pn_uuid(const std::string&); - -/** UUID is represented as a string but treated as if it always has 16 bytes. */ -PN_CPP_EXTERN struct Uuid : public std::string{ - Uuid(const std::string& s=std::string()) : std::string(s) {} - Uuid(const pn_uuid_t& u) : std::string(&u.bytes[0], sizeof(pn_uuid_t::bytes)) {} - operator pn_uuid_t() const { return pn_uuid(*this); } -}; - // TODO aconway 2015-06-11: alternative representation of variable-length data -// as pointer to existing buffers. +// as pointer to existing buffer. + +/** Array of 16 bytes representing a UUID */ +struct Uuid : public Comparable<Uuid> { // FIXME aconway 2015-06-18: std::array in C++11 + public: + static const size_t SIZE = 16; + + PN_CPP_EXTERN Uuid(); + PN_CPP_EXTERN Uuid(const pn_uuid_t& u); + PN_CPP_EXTERN operator pn_uuid_t() const; + PN_CPP_EXTERN bool operator==(const Uuid&) const; + PN_CPP_EXTERN bool operator<(const Uuid&) const; + + char* begin() { return bytes; } + const char* begin() const { return bytes; } + char* end() { return bytes + SIZE; } + const char* end() const { return bytes + SIZE; } + char& operator[](size_t i) { return bytes[i]; } + const char& operator[](size_t i) const { return bytes[i]; } + size_t size() const { return SIZE; } + + // Human-readable representation. + friend PN_CPP_EXTERN std::ostream& operator<<(std::ostream&, const Uuid&); + private: + char bytes[SIZE]; +}; // TODO aconway 2015-06-16: usable representation of decimal types. +/**@internal*/ template <class T> struct Decimal : public Comparable<Decimal<T> > { char value[sizeof(T)]; Decimal() { ::memset(value, 0, sizeof(T)); } Decimal(const T& v) { ::memcpy(value, &v, sizeof(T)); } - operator T() const { return *reinterpret_cast<const T*>(value); } + operator T() const { T x; ::memcpy(&x, value, sizeof(T)); return x; } bool operator<(const Decimal<T>& x) { return std::lexicographical_compare(value, value+sizeof(T), x.value, x.value+sizeof(T)); } }; + typedef Decimal<pn_decimal32_t> Decimal32; typedef Decimal<pn_decimal64_t> Decimal64; typedef Decimal<pn_decimal128_t> Decimal128; -PN_CPP_EXTERN struct Timestamp : public Comparable<Timestamp> { +struct Timestamp : public Comparable<Timestamp> { pn_timestamp_t milliseconds; ///< Since the epoch 00:00:00 (UTC), 1 January 1970. - Timestamp(int64_t ms=0) : milliseconds(ms) {} + Timestamp(std::int64_t ms=0) : milliseconds(ms) {} operator pn_timestamp_t() const { return milliseconds; } bool operator==(const Timestamp& x) { return milliseconds == x.milliseconds; } bool operator<(const Timestamp& x) { return milliseconds < x.milliseconds; } @@ -154,29 +189,6 @@ PN_CPP_EXTERN struct Timestamp : public Comparable<Timestamp> { ///@} -template <class T> struct TypeIdOf {}; -template<> struct TypeIdOf<Null> { static const TypeId value=NULL_; }; -template<> struct TypeIdOf<Bool> { static const TypeId value=BOOL; }; -template<> struct TypeIdOf<Ubyte> { static const TypeId value=UBYTE; }; -template<> struct TypeIdOf<Byte> { static const TypeId value=BYTE; }; -template<> struct TypeIdOf<Ushort> { static const TypeId value=USHORT; }; -template<> struct TypeIdOf<Short> { static const TypeId value=SHORT; }; -template<> struct TypeIdOf<Uint> { static const TypeId value=UINT; }; -template<> struct TypeIdOf<Int> { static const TypeId value=INT; }; -template<> struct TypeIdOf<Char> { static const TypeId value=CHAR; }; -template<> struct TypeIdOf<Ulong> { static const TypeId value=ULONG; }; -template<> struct TypeIdOf<Long> { static const TypeId value=LONG; }; -template<> struct TypeIdOf<Timestamp> { static const TypeId value=TIMESTAMP; }; -template<> struct TypeIdOf<Float> { static const TypeId value=FLOAT; }; -template<> struct TypeIdOf<Double> { static const TypeId value=DOUBLE; }; -template<> struct TypeIdOf<Decimal32> { static const TypeId value=DECIMAL32; }; -template<> struct TypeIdOf<Decimal64> { static const TypeId value=DECIMAL64; }; -template<> struct TypeIdOf<Decimal128> { static const TypeId value=DECIMAL128; }; -template<> struct TypeIdOf<Uuid> { static const TypeId value=UUID; }; -template<> struct TypeIdOf<Binary> { static const TypeId value=BINARY; }; -template<> struct TypeIdOf<String> { static const TypeId value=STRING; }; -template<> struct TypeIdOf<Symbol> { static const TypeId value=SYMBOL; }; - template<class T, TypeId A> struct TypePair { typedef T CppType; TypeId type; @@ -193,6 +205,20 @@ template<class T, TypeId A> struct CRef : public TypePair<T, A> { const T& value; }; +/** A holder for AMQP values. A holder is always encoded/decoded as its AmqpValue, no need + * for the as<TYPE>() helper functions. + * + * For example to encode an array of arrays using std::vector: + * + * typedef Holder<std::vector<String>, ARRAY> Inner; + * typedef Holder<std::vector<Inner>, ARRAY> Outer; + * Outer o ... + * encoder << o; + */ +template<class T, TypeId A> struct Holder : public TypePair<T, A> { + T value; +}; + /** Create a reference to value as AMQP type A for decoding. For example to decode an array of Int: * * std::vector<Int> v; @@ -213,9 +239,6 @@ PN_CPP_EXTERN std::string typeName(TypeId); /** Print the name of a type */ PN_CPP_EXTERN std::ostream& operator<<(std::ostream&, TypeId); -/** Return the name of a type from a class. */ -PN_CPP_EXTERN template<class T> std::string typeName() { return typeName(TypeIdOf<T>::value); } - /** Information needed to start extracting or inserting a container type. * * With a decoder you can use `Start s = decoder.start()` or `Start s; decoder > s` @@ -224,29 +247,29 @@ PN_CPP_EXTERN template<class T> std::string typeName() { return typeName(TypeIdO * With an encoder use one of the member functions startArray, startList, startMap or startDescribed * to create an appropriate Start value, e.g. `encoder << startList() << ...` */ -PN_CPP_EXTERN struct Start { - Start(TypeId type=NULL_, TypeId element=NULL_, bool described=false, size_t size=0); +struct Start { + PN_CPP_EXTERN Start(TypeId type=NULL_, TypeId element=NULL_, bool described=false, size_t size=0); TypeId type; ///< The container type: ARRAY, LIST, MAP or DESCRIBED. TypeId element; ///< the element type for array only. bool isDescribed; ///< true if first value is a descriptor. size_t size; ///< the element count excluding the descriptor (if any) /** Return a Start for an array */ - static Start array(TypeId element, bool described=false); + PN_CPP_EXTERN static Start array(TypeId element, bool described=false); /** Return a Start for a list */ - static Start list(); + PN_CPP_EXTERN static Start list(); /** Return a Start for a map */ - static Start map(); + PN_CPP_EXTERN static Start map(); /** Return a Start for a described type */ - static Start described(); + PN_CPP_EXTERN static Start described(); }; /** Finish insterting or extracting a container value. */ -PN_CPP_EXTERN struct Finish {}; +struct Finish {}; inline Finish finish() { return Finish(); } /** Skip a value */ -PN_CPP_EXTERN struct Skip{}; +struct Skip{}; inline Skip skip() { return Skip(); } }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/BlockingConnection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/BlockingConnection.cpp b/proton-c/bindings/cpp/src/BlockingConnection.cpp new file mode 100644 index 0000000..3e57b91 --- /dev/null +++ b/proton-c/bindings/cpp/src/BlockingConnection.cpp @@ -0,0 +1,62 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/Container.hpp" +#include "proton/BlockingConnection.hpp" +#include "proton/BlockingSender.hpp" +#include "proton/MessagingHandler.hpp" +#include "proton/Error.hpp" +#include "Msg.hpp" +#include "BlockingConnectionImpl.hpp" +#include "PrivateImplRef.hpp" + +namespace proton { +namespace reactor { + +template class Handle<BlockingConnectionImpl>; +typedef PrivateImplRef<BlockingConnection> PI; + +BlockingConnection::BlockingConnection() {PI::ctor(*this, 0); } + +BlockingConnection::BlockingConnection(const BlockingConnection& c) : Handle<BlockingConnectionImpl>() { PI::copy(*this, c); } + +BlockingConnection& BlockingConnection::operator=(const BlockingConnection& c) { return PI::assign(*this, c); } +BlockingConnection::~BlockingConnection() { PI::dtor(*this); } + +BlockingConnection::BlockingConnection(std::string &url, Duration d, SslDomain *ssld, Container *c) { + BlockingConnectionImpl *cimpl = new BlockingConnectionImpl(url, d,ssld, c); + PI::ctor(*this, cimpl); +} + +void BlockingConnection::close() { impl->close(); } + +void BlockingConnection::wait(WaitCondition &cond) { return impl->wait(cond); } +void BlockingConnection::wait(WaitCondition &cond, std::string &msg, Duration timeout) { + return impl->wait(cond, msg, timeout); +} + +BlockingSender BlockingConnection::createSender(std::string &address, Handler *h) { + Sender sender = impl->container.createSender(impl->connection, address, h); + return BlockingSender(*this, sender); +} + +Duration BlockingConnection::getTimeout() { return impl->getTimeout(); } + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/BlockingConnectionImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/BlockingConnectionImpl.cpp b/proton-c/bindings/cpp/src/BlockingConnectionImpl.cpp new file mode 100644 index 0000000..912f11f --- /dev/null +++ b/proton-c/bindings/cpp/src/BlockingConnectionImpl.cpp @@ -0,0 +1,124 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/Container.hpp" +#include "proton/MessagingHandler.hpp" +#include "proton/Duration.hpp" +#include "proton/Error.hpp" +#include "proton/WaitCondition.hpp" +#include "BlockingConnectionImpl.hpp" +#include "Msg.hpp" +#include "contexts.hpp" + +#include "proton/connection.h" + +namespace proton { +namespace reactor { + +WaitCondition::~WaitCondition() {} + + +void BlockingConnectionImpl::incref(BlockingConnectionImpl *impl) { + impl->refCount++; +} + +void BlockingConnectionImpl::decref(BlockingConnectionImpl *impl) { + impl->refCount--; + if (impl->refCount == 0) + delete impl; +} + +namespace { +struct ConnectionOpening : public WaitCondition { + ConnectionOpening(pn_connection_t *c) : pnConnection(c) {} + bool achieved() { return (pn_connection_state(pnConnection) & PN_REMOTE_UNINIT); } + pn_connection_t *pnConnection; +}; + +struct ConnectionClosed : public WaitCondition { + ConnectionClosed(pn_connection_t *c) : pnConnection(c) {} + bool achieved() { return !(pn_connection_state(pnConnection) & PN_REMOTE_ACTIVE); } + pn_connection_t *pnConnection; +}; + +} + + +BlockingConnectionImpl::BlockingConnectionImpl(std::string &u, Duration timeout0, SslDomain *ssld, Container *c) + : url(u), timeout(timeout0), refCount(0) +{ + if (c) + container = *c; + container.start(); + container.setTimeout(timeout); + // Create connection and send the connection events here + connection = container.connect(url, static_cast<Handler *>(this)); + ConnectionOpening cond(connection.getPnConnection()); + wait(cond); +} + +BlockingConnectionImpl::~BlockingConnectionImpl() { + container = Container(); +} + +void BlockingConnectionImpl::close() { + connection.close(); + ConnectionClosed cond(connection.getPnConnection()); + wait(cond); +} + +void BlockingConnectionImpl::wait(WaitCondition &condition) { + std::string empty; + wait(condition, empty, timeout); +} + +void BlockingConnectionImpl::wait(WaitCondition &condition, std::string &msg, Duration waitTimeout) { + if (waitTimeout == Duration::FOREVER) { + while (!condition.achieved()) { + container.process(); + } + } + + pn_reactor_t *reactor = container.getReactor(); + pn_millis_t origTimeout = pn_reactor_get_timeout(reactor); + pn_reactor_set_timeout(reactor, waitTimeout.milliseconds); + try { + pn_timestamp_t now = pn_reactor_mark(reactor); + pn_timestamp_t deadline = now + waitTimeout.milliseconds; + while (!condition.achieved()) { + container.process(); + if (deadline < pn_reactor_mark(reactor)) { + std::string txt = "Connection timed out"; + if (!msg.empty()) + txt += ": " + msg; + // TODO: proper Timeout exception + throw Error(MSG(txt)); + } + } + } catch (...) { + pn_reactor_set_timeout(reactor, origTimeout); + throw; + } + pn_reactor_set_timeout(reactor, origTimeout); +} + + + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/BlockingConnectionImpl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/BlockingConnectionImpl.hpp b/proton-c/bindings/cpp/src/BlockingConnectionImpl.hpp new file mode 100644 index 0000000..2b2ef7e --- /dev/null +++ b/proton-c/bindings/cpp/src/BlockingConnectionImpl.hpp @@ -0,0 +1,63 @@ +#ifndef PROTON_CPP_CONNECTIONIMPL_H +#define PROTON_CPP_CONNECTIONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/export.hpp" +#include "proton/Endpoint.hpp" +#include "proton/Container.hpp" +#include "proton/types.h" +#include <string> + +struct pn_connection_t; + +namespace proton { +namespace reactor { + +class Handler; +class Container; +class SslDomain; + + class BlockingConnectionImpl : public MessagingHandler +{ + public: + PN_CPP_EXTERN BlockingConnectionImpl(std::string &url, Duration d, SslDomain *ssld, Container *c); + PN_CPP_EXTERN ~BlockingConnectionImpl(); + PN_CPP_EXTERN void close(); + PN_CPP_EXTERN void wait(WaitCondition &condition); + PN_CPP_EXTERN void wait(WaitCondition &condition, std::string &msg, Duration timeout); + PN_CPP_EXTERN pn_connection_t *getPnBlockingConnection(); + Duration getTimeout() { return timeout; } + static void incref(BlockingConnectionImpl *); + static void decref(BlockingConnectionImpl *); + private: + friend class BlockingConnection; + Container container; + Connection connection; + std::string url; + Duration timeout; + int refCount; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_CONNECTIONIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/BlockingLink.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/BlockingLink.cpp b/proton-c/bindings/cpp/src/BlockingLink.cpp new file mode 100644 index 0000000..afc5f35 --- /dev/null +++ b/proton-c/bindings/cpp/src/BlockingLink.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/BlockingLink.hpp" +#include "proton/BlockingConnection.hpp" +#include "proton/MessagingHandler.hpp" +#include "proton/WaitCondition.hpp" +#include "proton/Error.hpp" +#include "Msg.hpp" + + +namespace proton { +namespace reactor { + +namespace { +struct LinkOpened : public WaitCondition { + LinkOpened(pn_link_t *l) : pnLink(l) {} + bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_UNINIT); } + pn_link_t *pnLink; +}; + +struct LinkClosed : public WaitCondition { + LinkClosed(pn_link_t *l) : pnLink(l) {} + bool achieved() { return (pn_link_state(pnLink) & PN_REMOTE_CLOSED); } + pn_link_t *pnLink; +}; + +struct LinkNotOpen : public WaitCondition { + LinkNotOpen(pn_link_t *l) : pnLink(l) {} + bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_ACTIVE); } + pn_link_t *pnLink; +}; + + +} // namespace + + +BlockingLink::BlockingLink(BlockingConnection *c, pn_link_t *pnl) : connection(*c), link(pnl) { + std::string msg = "Opening link " + link.getName(); + LinkOpened linkOpened(link.getPnLink()); + connection.wait(linkOpened, msg); +} + +BlockingLink::~BlockingLink() {} + +void BlockingLink::waitForClosed(Duration timeout) { + std::string msg = "Closing link " + link.getName(); + LinkClosed linkClosed(link.getPnLink()); + connection.wait(linkClosed, msg); + checkClosed(); +} + +void BlockingLink::checkClosed() { + pn_link_t * pnLink = link.getPnLink(); + if (pn_link_state(pnLink) & PN_REMOTE_CLOSED) { + link.close(); + // TODO: LinkDetached exception + throw Error(MSG("Link detached")); + } +} + +void BlockingLink::close() { + link.close(); + std::string msg = "Closing link " + link.getName(); + LinkNotOpen linkNotOpen(link.getPnLink()); + connection.wait(linkNotOpen, msg); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/BlockingSender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/BlockingSender.cpp b/proton-c/bindings/cpp/src/BlockingSender.cpp new file mode 100644 index 0000000..7a24324 --- /dev/null +++ b/proton-c/bindings/cpp/src/BlockingSender.cpp @@ -0,0 +1,66 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/BlockingSender.hpp" +#include "proton/BlockingConnection.hpp" +#include "proton/WaitCondition.hpp" +#include "proton/Error.hpp" +#include "Msg.hpp" + + +namespace proton { +namespace reactor { + +namespace { +struct DeliverySettled : public WaitCondition { + DeliverySettled(pn_delivery_t *d) : pnDelivery(d) {} + bool achieved() { return pn_delivery_settled(pnDelivery); } + pn_delivery_t *pnDelivery; +}; + +} // namespace + + +BlockingSender::BlockingSender(BlockingConnection &c, Sender &l) : BlockingLink(&c, l.getPnLink()) { + std::string ta = link.getTarget().getAddress(); + std::string rta = link.getRemoteTarget().getAddress(); + if (ta.empty() || ta.compare(rta) != 0) { + waitForClosed(); + link.close(); + std::string txt = "Failed to open sender " + link.getName() + ", target does not match"; + throw Error(MSG("Container not started")); + } +} + +Delivery BlockingSender::send(Message &msg, Duration timeout) { + Sender snd = link; + Delivery dlv = snd.send(msg); + std::string txt = "Sending on sender " + link.getName(); + DeliverySettled cond(dlv.getPnDelivery()); + connection.wait(cond, txt, timeout); + return dlv; +} + +Delivery BlockingSender::send(Message &msg) { + // Use default timeout + return send(msg, connection.getTimeout()); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/ConnectionImpl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.hpp b/proton-c/bindings/cpp/src/ConnectionImpl.hpp index e20d614..442998e 100644 --- a/proton-c/bindings/cpp/src/ConnectionImpl.hpp +++ b/proton-c/bindings/cpp/src/ConnectionImpl.hpp @@ -21,7 +21,7 @@ * under the License. * */ -#include "proton/ImportExport.hpp" +#include "proton/export.hpp" #include "proton/Endpoint.hpp" #include "proton/Container.hpp" #include "proton/types.h" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/ContainerImpl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.hpp b/proton-c/bindings/cpp/src/ContainerImpl.hpp index 80df83a..72cbefa 100644 --- a/proton-c/bindings/cpp/src/ContainerImpl.hpp +++ b/proton-c/bindings/cpp/src/ContainerImpl.hpp @@ -21,7 +21,7 @@ * under the License. * */ -#include "proton/ImportExport.hpp" +#include "proton/export.hpp" #include "proton/MessagingHandler.hpp" #include "proton/Connection.hpp" #include "proton/Link.hpp" http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/Decoder.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Decoder.cpp b/proton-c/bindings/cpp/src/Decoder.cpp index 50e6a12..707bcea 100644 --- a/proton-c/bindings/cpp/src/Decoder.cpp +++ b/proton-c/bindings/cpp/src/Decoder.cpp @@ -36,7 +36,8 @@ Decoder::Decoder(const char* buffer, size_t size) { decode(buffer, size); } Decoder::Decoder(const std::string& buffer) { decode(buffer); } Decoder::~Decoder() {} -DecodeError::DecodeError(const std::string& msg) throw() : Error("decode: "+msg) {} +static const std::string prefix("decode: "); +DecodeError::DecodeError(const std::string& msg) throw() : Error(prefix+msg) {} namespace { struct SaveState { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/Duration.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Duration.cpp b/proton-c/bindings/cpp/src/Duration.cpp index 7a11819..b14be50 100644 --- a/proton-c/bindings/cpp/src/Duration.cpp +++ b/proton-c/bindings/cpp/src/Duration.cpp @@ -23,7 +23,7 @@ namespace proton { -const Duration Duration::FOREVER(std::numeric_limits<uint64_t>::max()); +const Duration Duration::FOREVER(std::numeric_limits<std::uint64_t>::max()); const Duration Duration::IMMEDIATE(0); const Duration Duration::SECOND(1000); const Duration Duration::MINUTE(SECOND * 60); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/Encoder.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Encoder.cpp b/proton-c/bindings/cpp/src/Encoder.cpp index 892c338..551a21a 100644 --- a/proton-c/bindings/cpp/src/Encoder.cpp +++ b/proton-c/bindings/cpp/src/Encoder.cpp @@ -28,7 +28,8 @@ namespace proton { Encoder::Encoder() {} Encoder::~Encoder() {} -EncodeError::EncodeError(const std::string& msg) throw() : Error("encode: "+msg) {} +static const std::string prefix("encode: "); +EncodeError::EncodeError(const std::string& msg) throw() : Error(prefix+msg) {} namespace { struct SaveState { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/Error.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Error.cpp b/proton-c/bindings/cpp/src/Error.cpp index e97bf5f..4fd7f43 100644 --- a/proton-c/bindings/cpp/src/Error.cpp +++ b/proton-c/bindings/cpp/src/Error.cpp @@ -21,7 +21,9 @@ namespace proton { -Error::Error(const std::string& msg) throw() : std::runtime_error("proton: "+msg) {} +static const std::string prefix("proton: "); + +Error::Error(const std::string& msg) throw() : std::runtime_error(prefix+msg) {} MessageReject::MessageReject(const std::string& msg) throw() : Error(msg) {} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/MessagingAdapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp index 3b4152d..90ad510 100644 --- a/proton-c/bindings/cpp/src/MessagingAdapter.cpp +++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp @@ -121,7 +121,7 @@ void MessagingAdapter::onDelivery(Event &e) { } else { // Sender if (pn_delivery_updated(dlv)) { - uint64_t rstate = pn_delivery_remote_state(dlv); + std::uint64_t rstate = pn_delivery_remote_state(dlv); if (rstate == PN_ACCEPTED) { MessagingEvent mevent(PN_MESSAGING_ACCEPTED, *pe); delegate.onAccepted(mevent); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/PrivateImplRef.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/PrivateImplRef.hpp b/proton-c/bindings/cpp/src/PrivateImplRef.hpp index 494fef0..560e740 100644 --- a/proton-c/bindings/cpp/src/PrivateImplRef.hpp +++ b/proton-c/bindings/cpp/src/PrivateImplRef.hpp @@ -22,7 +22,7 @@ * */ -#include "proton/ImportExport.hpp" +#include "proton/export.hpp" namespace proton { namespace reactor { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/ProtonImplRef.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ProtonImplRef.hpp b/proton-c/bindings/cpp/src/ProtonImplRef.hpp index 8621dc8..426fb6d 100644 --- a/proton-c/bindings/cpp/src/ProtonImplRef.hpp +++ b/proton-c/bindings/cpp/src/ProtonImplRef.hpp @@ -22,7 +22,7 @@ * */ -#include "proton/ImportExport.hpp" +#include "proton/export.hpp" #include "proton/object.h" namespace proton { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/Sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Sender.cpp b/proton-c/bindings/cpp/src/Sender.cpp index 11fa3c8..3998d62 100644 --- a/proton-c/bindings/cpp/src/Sender.cpp +++ b/proton-c/bindings/cpp/src/Sender.cpp @@ -38,8 +38,7 @@ namespace proton { namespace reactor { -Sender::Sender(pn_link_t *lnk = 0) : Link(lnk) {} -Sender::Sender() : Link(0) {} +Sender::Sender(pn_link_t *lnk) : Link(lnk) {} void Sender::verifyType(pn_link_t *lnk) { if (lnk && pn_link_is_receiver(lnk)) @@ -51,14 +50,14 @@ Sender::Sender(const Link& c) : Link(c.getPnLink()) {} namespace{ // revisit if thread safety required -uint64_t tagCounter = 0; +std::uint64_t tagCounter = 0; } Delivery Sender::send(Message &message) { char tag[8]; void *ptr = &tag; - uint64_t id = ++tagCounter; - *((uint64_t *) ptr) = id; + std::uint64_t id = ++tagCounter; + *((std::uint64_t *) ptr) = id; pn_delivery_t *dlv = pn_delivery(getPnLink(), pn_dtag(tag, 8)); std::string buf; message.encode(buf); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/Session.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Session.cpp b/proton-c/bindings/cpp/src/Session.cpp index 57f788e..594d192 100644 --- a/proton-c/bindings/cpp/src/Session.cpp +++ b/proton-c/bindings/cpp/src/Session.cpp @@ -26,6 +26,7 @@ #include "proton/Session.hpp" #include "proton/Connection.hpp" #include "ConnectionImpl.hpp" +#include "ProtonImplRef.hpp" namespace proton { namespace reactor { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/Url.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Url.hpp b/proton-c/bindings/cpp/src/Url.hpp index 5dfb79e..3c2e450 100644 --- a/proton-c/bindings/cpp/src/Url.hpp +++ b/proton-c/bindings/cpp/src/Url.hpp @@ -21,7 +21,7 @@ * under the License. * */ -#include "proton/ImportExport.hpp" +#include "proton/export.hpp" #include "proton/ProtonHandle.hpp" #include "proton/url.h" #include <string> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp b/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp deleted file mode 100644 index 3e57b91..0000000 --- a/proton-c/bindings/cpp/src/blocking/BlockingConnection.cpp +++ /dev/null @@ -1,62 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton/Container.hpp" -#include "proton/BlockingConnection.hpp" -#include "proton/BlockingSender.hpp" -#include "proton/MessagingHandler.hpp" -#include "proton/Error.hpp" -#include "Msg.hpp" -#include "BlockingConnectionImpl.hpp" -#include "PrivateImplRef.hpp" - -namespace proton { -namespace reactor { - -template class Handle<BlockingConnectionImpl>; -typedef PrivateImplRef<BlockingConnection> PI; - -BlockingConnection::BlockingConnection() {PI::ctor(*this, 0); } - -BlockingConnection::BlockingConnection(const BlockingConnection& c) : Handle<BlockingConnectionImpl>() { PI::copy(*this, c); } - -BlockingConnection& BlockingConnection::operator=(const BlockingConnection& c) { return PI::assign(*this, c); } -BlockingConnection::~BlockingConnection() { PI::dtor(*this); } - -BlockingConnection::BlockingConnection(std::string &url, Duration d, SslDomain *ssld, Container *c) { - BlockingConnectionImpl *cimpl = new BlockingConnectionImpl(url, d,ssld, c); - PI::ctor(*this, cimpl); -} - -void BlockingConnection::close() { impl->close(); } - -void BlockingConnection::wait(WaitCondition &cond) { return impl->wait(cond); } -void BlockingConnection::wait(WaitCondition &cond, std::string &msg, Duration timeout) { - return impl->wait(cond, msg, timeout); -} - -BlockingSender BlockingConnection::createSender(std::string &address, Handler *h) { - Sender sender = impl->container.createSender(impl->connection, address, h); - return BlockingSender(*this, sender); -} - -Duration BlockingConnection::getTimeout() { return impl->getTimeout(); } - -}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp deleted file mode 100644 index 912f11f..0000000 --- a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.cpp +++ /dev/null @@ -1,124 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton/Container.hpp" -#include "proton/MessagingHandler.hpp" -#include "proton/Duration.hpp" -#include "proton/Error.hpp" -#include "proton/WaitCondition.hpp" -#include "BlockingConnectionImpl.hpp" -#include "Msg.hpp" -#include "contexts.hpp" - -#include "proton/connection.h" - -namespace proton { -namespace reactor { - -WaitCondition::~WaitCondition() {} - - -void BlockingConnectionImpl::incref(BlockingConnectionImpl *impl) { - impl->refCount++; -} - -void BlockingConnectionImpl::decref(BlockingConnectionImpl *impl) { - impl->refCount--; - if (impl->refCount == 0) - delete impl; -} - -namespace { -struct ConnectionOpening : public WaitCondition { - ConnectionOpening(pn_connection_t *c) : pnConnection(c) {} - bool achieved() { return (pn_connection_state(pnConnection) & PN_REMOTE_UNINIT); } - pn_connection_t *pnConnection; -}; - -struct ConnectionClosed : public WaitCondition { - ConnectionClosed(pn_connection_t *c) : pnConnection(c) {} - bool achieved() { return !(pn_connection_state(pnConnection) & PN_REMOTE_ACTIVE); } - pn_connection_t *pnConnection; -}; - -} - - -BlockingConnectionImpl::BlockingConnectionImpl(std::string &u, Duration timeout0, SslDomain *ssld, Container *c) - : url(u), timeout(timeout0), refCount(0) -{ - if (c) - container = *c; - container.start(); - container.setTimeout(timeout); - // Create connection and send the connection events here - connection = container.connect(url, static_cast<Handler *>(this)); - ConnectionOpening cond(connection.getPnConnection()); - wait(cond); -} - -BlockingConnectionImpl::~BlockingConnectionImpl() { - container = Container(); -} - -void BlockingConnectionImpl::close() { - connection.close(); - ConnectionClosed cond(connection.getPnConnection()); - wait(cond); -} - -void BlockingConnectionImpl::wait(WaitCondition &condition) { - std::string empty; - wait(condition, empty, timeout); -} - -void BlockingConnectionImpl::wait(WaitCondition &condition, std::string &msg, Duration waitTimeout) { - if (waitTimeout == Duration::FOREVER) { - while (!condition.achieved()) { - container.process(); - } - } - - pn_reactor_t *reactor = container.getReactor(); - pn_millis_t origTimeout = pn_reactor_get_timeout(reactor); - pn_reactor_set_timeout(reactor, waitTimeout.milliseconds); - try { - pn_timestamp_t now = pn_reactor_mark(reactor); - pn_timestamp_t deadline = now + waitTimeout.milliseconds; - while (!condition.achieved()) { - container.process(); - if (deadline < pn_reactor_mark(reactor)) { - std::string txt = "Connection timed out"; - if (!msg.empty()) - txt += ": " + msg; - // TODO: proper Timeout exception - throw Error(MSG(txt)); - } - } - } catch (...) { - pn_reactor_set_timeout(reactor, origTimeout); - throw; - } - pn_reactor_set_timeout(reactor, origTimeout); -} - - - -}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.hpp b/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.hpp deleted file mode 100644 index 989a317..0000000 --- a/proton-c/bindings/cpp/src/blocking/BlockingConnectionImpl.hpp +++ /dev/null @@ -1,63 +0,0 @@ -#ifndef PROTON_CPP_CONNECTIONIMPL_H -#define PROTON_CPP_CONNECTIONIMPL_H - -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton/ImportExport.hpp" -#include "proton/Endpoint.hpp" -#include "proton/Container.hpp" -#include "proton/types.h" -#include <string> - -struct pn_connection_t; - -namespace proton { -namespace reactor { - -class Handler; -class Container; -class SslDomain; - - class BlockingConnectionImpl : public MessagingHandler -{ - public: - PN_CPP_EXTERN BlockingConnectionImpl(std::string &url, Duration d, SslDomain *ssld, Container *c); - PN_CPP_EXTERN ~BlockingConnectionImpl(); - PN_CPP_EXTERN void close(); - PN_CPP_EXTERN void wait(WaitCondition &condition); - PN_CPP_EXTERN void wait(WaitCondition &condition, std::string &msg, Duration timeout); - PN_CPP_EXTERN pn_connection_t *getPnBlockingConnection(); - Duration getTimeout() { return timeout; } - static void incref(BlockingConnectionImpl *); - static void decref(BlockingConnectionImpl *); - private: - friend class BlockingConnection; - Container container; - Connection connection; - std::string url; - Duration timeout; - int refCount; -}; - - -}} // namespace proton::reactor - -#endif /*!PROTON_CPP_CONNECTIONIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp b/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp deleted file mode 100644 index afc5f35..0000000 --- a/proton-c/bindings/cpp/src/blocking/BlockingLink.cpp +++ /dev/null @@ -1,86 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton/BlockingLink.hpp" -#include "proton/BlockingConnection.hpp" -#include "proton/MessagingHandler.hpp" -#include "proton/WaitCondition.hpp" -#include "proton/Error.hpp" -#include "Msg.hpp" - - -namespace proton { -namespace reactor { - -namespace { -struct LinkOpened : public WaitCondition { - LinkOpened(pn_link_t *l) : pnLink(l) {} - bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_UNINIT); } - pn_link_t *pnLink; -}; - -struct LinkClosed : public WaitCondition { - LinkClosed(pn_link_t *l) : pnLink(l) {} - bool achieved() { return (pn_link_state(pnLink) & PN_REMOTE_CLOSED); } - pn_link_t *pnLink; -}; - -struct LinkNotOpen : public WaitCondition { - LinkNotOpen(pn_link_t *l) : pnLink(l) {} - bool achieved() { return !(pn_link_state(pnLink) & PN_REMOTE_ACTIVE); } - pn_link_t *pnLink; -}; - - -} // namespace - - -BlockingLink::BlockingLink(BlockingConnection *c, pn_link_t *pnl) : connection(*c), link(pnl) { - std::string msg = "Opening link " + link.getName(); - LinkOpened linkOpened(link.getPnLink()); - connection.wait(linkOpened, msg); -} - -BlockingLink::~BlockingLink() {} - -void BlockingLink::waitForClosed(Duration timeout) { - std::string msg = "Closing link " + link.getName(); - LinkClosed linkClosed(link.getPnLink()); - connection.wait(linkClosed, msg); - checkClosed(); -} - -void BlockingLink::checkClosed() { - pn_link_t * pnLink = link.getPnLink(); - if (pn_link_state(pnLink) & PN_REMOTE_CLOSED) { - link.close(); - // TODO: LinkDetached exception - throw Error(MSG("Link detached")); - } -} - -void BlockingLink::close() { - link.close(); - std::string msg = "Closing link " + link.getName(); - LinkNotOpen linkNotOpen(link.getPnLink()); - connection.wait(linkNotOpen, msg); -} - -}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp b/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp deleted file mode 100644 index 7a24324..0000000 --- a/proton-c/bindings/cpp/src/blocking/BlockingSender.cpp +++ /dev/null @@ -1,66 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include "proton/BlockingSender.hpp" -#include "proton/BlockingConnection.hpp" -#include "proton/WaitCondition.hpp" -#include "proton/Error.hpp" -#include "Msg.hpp" - - -namespace proton { -namespace reactor { - -namespace { -struct DeliverySettled : public WaitCondition { - DeliverySettled(pn_delivery_t *d) : pnDelivery(d) {} - bool achieved() { return pn_delivery_settled(pnDelivery); } - pn_delivery_t *pnDelivery; -}; - -} // namespace - - -BlockingSender::BlockingSender(BlockingConnection &c, Sender &l) : BlockingLink(&c, l.getPnLink()) { - std::string ta = link.getTarget().getAddress(); - std::string rta = link.getRemoteTarget().getAddress(); - if (ta.empty() || ta.compare(rta) != 0) { - waitForClosed(); - link.close(); - std::string txt = "Failed to open sender " + link.getName() + ", target does not match"; - throw Error(MSG("Container not started")); - } -} - -Delivery BlockingSender::send(Message &msg, Duration timeout) { - Sender snd = link; - Delivery dlv = snd.send(msg); - std::string txt = "Sending on sender " + link.getName(); - DeliverySettled cond(dlv.getPnDelivery()); - connection.wait(cond, txt, timeout); - return dlv; -} - -Delivery BlockingSender::send(Message &msg) { - // Use default timeout - return send(msg, connection.getTimeout()); -} - -}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/interop_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/interop_test.cpp b/proton-c/bindings/cpp/src/interop_test.cpp index db309f7..0b05566 100644 --- a/proton-c/bindings/cpp/src/interop_test.cpp +++ b/proton-c/bindings/cpp/src/interop_test.cpp @@ -27,7 +27,6 @@ #include <fstream> #include <streambuf> #include <iosfwd> -#include <unistd.h> using namespace std; using namespace proton; @@ -43,7 +42,7 @@ struct Fail : public logic_error { Fail(const string& what) : logic_error(what) string read(string filename) { - filename = testsDir+"/interop/"+filename+".amqp"; + filename = testsDir+string("/interop/")+filename+string(".amqp"); ifstream ifs(filename.c_str()); if (!ifs.good()) FAIL("Can't open " << filename); return string(istreambuf_iterator<char>(ifs), istreambuf_iterator<char>()); @@ -67,19 +66,19 @@ void testDataOstream() { void testDecoderPrimitvesExact() { Decoder d(read("primitives")); ASSERT(d.more()); - try { get<int8_t>(d); FAIL("got bool as byte"); } catch(DecodeError){} + try { get<std::int8_t>(d); FAIL("got bool as byte"); } catch(DecodeError){} ASSERT_EQUAL(true, get<bool>(d)); ASSERT_EQUAL(false, get<bool>(d)); - try { get<int8_t>(d); FAIL("got ubyte as byte"); } catch(DecodeError){} - ASSERT_EQUAL(42, get<uint8_t>(d)); - try { get<int32_t>(d); FAIL("got uint as ushort"); } catch(DecodeError){} - ASSERT_EQUAL(42, get<uint16_t>(d)); - try { get<uint16_t>(d); FAIL("got short as ushort"); } catch(DecodeError){} - ASSERT_EQUAL(-42, get<int16_t>(d)); - ASSERT_EQUAL(12345, get<uint32_t>(d)); - ASSERT_EQUAL(-12345, get<int32_t>(d)); - ASSERT_EQUAL(12345, get<uint64_t>(d)); - ASSERT_EQUAL(-12345, get<int64_t>(d)); + try { get<std::int8_t>(d); FAIL("got ubyte as byte"); } catch(DecodeError){} + ASSERT_EQUAL(42, get<std::uint8_t>(d)); + try { get<std::int32_t>(d); FAIL("got uint as ushort"); } catch(DecodeError){} + ASSERT_EQUAL(42, get<std::uint16_t>(d)); + try { get<std::uint16_t>(d); FAIL("got short as ushort"); } catch(DecodeError){} + ASSERT_EQUAL(-42, get<std::int16_t>(d)); + ASSERT_EQUAL(12345, get<std::uint32_t>(d)); + ASSERT_EQUAL(-12345, get<std::int32_t>(d)); + ASSERT_EQUAL(12345, get<std::uint64_t>(d)); + ASSERT_EQUAL(-12345, get<std::int64_t>(d)); try { get<double>(d); FAIL("got float as double"); } catch(DecodeError){} ASSERT_EQUAL(0.125, get<float>(d)); try { get<float>(d); FAIL("got double as float"); } catch(DecodeError){} @@ -91,10 +90,10 @@ void testDecoderPrimitvesExact() { void testEncoderPrimitives() { Encoder e; e << true << false; - e << uint8_t(42); - e << uint16_t(42) << int16_t(-42); - e << uint32_t(12345) << int32_t(-12345); - e << uint64_t(12345) << int64_t(-12345); + e << std::uint8_t(42); + e << std::uint16_t(42) << std::int16_t(-42); + e << std::uint32_t(12345) << std::int32_t(-12345); + e << std::uint64_t(12345) << std::int64_t(-12345); e << float(0.125) << double(0.125); ASSERT_EQUAL("true, false, 42, 42, -42, 12345, -12345, 12345, -12345, 0.125, 0.125", str(e)); std::string data = e.encode(); @@ -132,8 +131,7 @@ int run_test(void (*testfn)(), const char* name) { int main(int argc, char** argv) { int failed = 0; - char buf[1024]; - if (argc != 2) FAIL("Usage: " << argv[0] << " tests-dir" << " IN " << getcwd(buf, sizeof(buf))); + if (argc != 2) FAIL("Usage: " << argv[0] << " tests-dir"); testsDir = argv[1]; failed += RUN_TEST(testDataOstream); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/proton_bits.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/proton_bits.hpp b/proton-c/bindings/cpp/src/proton_bits.hpp index 55c6473..803dae1 100644 --- a/proton-c/bindings/cpp/src/proton_bits.hpp +++ b/proton-c/bindings/cpp/src/proton_bits.hpp @@ -19,6 +19,7 @@ * under the License. */ +#include <string> #include <iosfwd> #include <proton/error.h> http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/bindings/cpp/src/types.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/types.cpp b/proton-c/bindings/cpp/src/types.cpp index 593d052..b34567b 100644 --- a/proton-c/bindings/cpp/src/types.cpp +++ b/proton-c/bindings/cpp/src/types.cpp @@ -20,9 +20,46 @@ #include "proton/types.hpp" #include <proton/codec.h> #include <ostream> +#include <algorithm> namespace proton { +Uuid::Uuid() { std::fill(bytes, bytes+SIZE, 0); } +Uuid::Uuid(const pn_uuid_t& u) { std::copy(u.bytes, u.bytes+SIZE, bytes); } + +Uuid::operator pn_uuid_t() const { + pn_uuid_t u; + std::copy(begin(), end(), u.bytes); + return u; +} + +bool Uuid::operator==(const Uuid& x) const { + return std::equal(begin(), end(), x.begin()); +} + +bool Uuid::operator<(const Uuid& x) const { + return std::lexicographical_compare(begin(), end(), x.begin(), x.end()) < 0; +} + +namespace { +inline std::ostream& printSegment(std::ostream& o, const Uuid& u, size_t begin, size_t end, const char* sep="") { + for (const char* p = &u[begin]; p < &u[end]; ++p) o << *p; + return o << sep; +} +} + +std::ostream& operator<<(std::ostream& o, const Uuid& u) { + std::ios_base::fmtflags ff = o.flags(); + o.flags(std::ios_base::hex); + printSegment(o, u, 0, 4, "-"); + printSegment(o, u, 4, 6, "-"); + printSegment(o, u, 6, 8, "-"); + printSegment(o, u, 8, 10, "-"); + printSegment(o, u, 10, 16); + o.flags(ff); + return o; +} + std::string typeName(TypeId t) { switch (t) { case NULL_: return "null"; @@ -67,12 +104,6 @@ pn_bytes_t pn_bytes(const std::string& s) { std::string str(const pn_bytes_t& b) { return std::string(b.start, b.size); } -pn_uuid_t pn_uuid(const std::string& s) { - pn_uuid_t u = {0}; // Zero initialized. - std::copy(s.begin(), s.begin() + std::max(s.size(), sizeof(pn_uuid_t::bytes)), &u.bytes[0]); - return u; -} - Start::Start(TypeId t, TypeId e, bool d, size_t s) : type(t), element(e), isDescribed(d), size(s) {} Start Start::array(TypeId element, bool described) { return Start(ARRAY, element, described); } Start Start::list() { return Start(LIST); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9f7e3462/proton-c/include/proton/codec.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/codec.h b/proton-c/include/proton/codec.h index 3ab7f8e..05b8f6f 100644 --- a/proton-c/include/proton/codec.h +++ b/proton-c/include/proton/codec.h @@ -178,7 +178,7 @@ typedef enum { } pn_type_t; /** A special invalid type value that is returned when no valid type is available. */ -extern const pn_type_t PN_INVALID; +PN_EXTERN extern const pn_type_t PN_INVALID; /** * Return a string name for an AMQP type. --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
