PROTON-865: Added basic Terminus, Broker.cpp example
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e8acc1e5 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e8acc1e5 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e8acc1e5 Branch: refs/heads/cjansen-cpp-client Commit: e8acc1e5f58975dc9d1d99d779f5e3c77289a4b1 Parents: 3c0c90f Author: Clifford Jansen <[email protected]> Authored: Mon May 11 19:29:53 2015 -0700 Committer: Alan Conway <[email protected]> Committed: Tue Jun 2 14:46:15 2015 -0400 ---------------------------------------------------------------------- proton-c/bindings/cpp/CMakeLists.txt | 4 + proton-c/bindings/cpp/examples/Broker.cpp | 193 +++++++++++++++++++ .../cpp/include/proton/cpp/Connection.h | 1 + .../bindings/cpp/include/proton/cpp/Endpoint.h | 11 ++ .../bindings/cpp/include/proton/cpp/Handle.h | 4 + proton-c/bindings/cpp/include/proton/cpp/Link.h | 6 + .../bindings/cpp/include/proton/cpp/Receiver.h | 1 + .../bindings/cpp/include/proton/cpp/Sender.h | 1 + .../bindings/cpp/include/proton/cpp/Terminus.h | 81 ++++++++ proton-c/bindings/cpp/src/Connection.cpp | 4 + proton-c/bindings/cpp/src/ConnectionImpl.cpp | 16 +- proton-c/bindings/cpp/src/ConnectionImpl.h | 3 +- proton-c/bindings/cpp/src/ContainerImpl.cpp | 2 + proton-c/bindings/cpp/src/Link.cpp | 31 ++- proton-c/bindings/cpp/src/Receiver.cpp | 2 + proton-c/bindings/cpp/src/Sender.cpp | 3 + proton-c/bindings/cpp/src/Terminus.cpp | 102 ++++++++++ 17 files changed, 448 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/CMakeLists.txt b/proton-c/bindings/cpp/CMakeLists.txt index ba35cb1..54d8ddd 100644 --- a/proton-c/bindings/cpp/CMakeLists.txt +++ b/proton-c/bindings/cpp/CMakeLists.txt @@ -37,6 +37,7 @@ set (qpid-proton-cpp-core src/Event.cpp src/Handler.cpp src/Link.cpp + src/Terminus.cpp src/Acceptor.cpp src/Url.cpp src/Message.cpp @@ -99,6 +100,9 @@ add_executable (SimpleRecv examples/SimpleRecv.cpp) target_link_libraries (SimpleRecv qpid-proton-cpp) add_executable (SimpleSend examples/SimpleSend.cpp) target_link_libraries (SimpleSend qpid-proton-cpp) +add_executable (Broker examples/Broker.cpp) +target_link_libraries (Broker qpid-proton-cpp) + install (TARGETS qpid-proton-cpp EXPORT proton http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/examples/Broker.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/examples/Broker.cpp b/proton-c/bindings/cpp/examples/Broker.cpp new file mode 100644 index 0000000..7d5214d --- /dev/null +++ b/proton-c/bindings/cpp/examples/Broker.cpp @@ -0,0 +1,193 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Container.h" +#include "proton/cpp/MessagingHandler.h" + +#include <iostream> +#include <deque> +#include <map> +#include <list> + + +using namespace proton::reactor; + +std::string generateUuid(){ + throw "TODO: platform neutral uuid"; +} + +class Queue { + public: + bool dynamic; + typedef std::deque<Message> MsgQ; + typedef std::list<Sender> List; + MsgQ queue; + List consumers; + + Queue(bool dyn = false) : dynamic(dyn), queue(MsgQ()), consumers(List()) {} + + void subscribe(Sender &c) { + consumers.push_back(c); + } + + bool unsubscribe(Sender &c) { + consumers.remove(c); + return (consumers.size() == 0 && (dynamic || queue.size() == 0)); + } + + void publish(Message &m) { + queue.push_back(m); + dispatch(0); + } + + void dispatch(Sender *s) { + while (deliverTo(s)) { + } + } + + bool deliverTo(Sender *consumer) { + // deliver to single consumer if supplied, else all consumers + int count = consumer ? 1 : consumers.size(); + if (!count) return false; + bool result = false; + List::iterator it = consumers.begin(); + if (!consumer && count) consumer = &*it; + + while (queue.size()) { + if (consumer->getCredit()) { + consumer->send(queue.front()); + queue.pop_front(); + result = true; + } + if (--count) + it++; + else + return result; + } + return false; + } +}; + +class Broker : public MessagingHandler { + private: + std::string url; + typedef std::map<std::string, Queue *> QMap; + QMap queues; + public: + + Broker(const std::string &s) : url(s), queues(QMap()) {} + + void onStart(Event &e) { + e.getContainer().listen(url); + } + + Queue &queue(std::string &address) { + QMap::iterator it = queues.find(address); + if (it == queues.end()) { + queues[address] = new Queue(); + return *queues[address]; + } + else { + return *it->second; + } + } + + void onLinkOpening(Event &e) { + Link lnk = e.getLink(); + if (lnk.isSender()) { + Sender sender(lnk); + Terminus remoteSource(lnk.getRemoteSource()); + if (remoteSource.isDynamic()) { + std::string address = generateUuid(); + lnk.getSource().setAddress(address); + Queue *q = new Queue(true); + queues[address] = q; + q->subscribe(sender); + } + else { + std::string address = remoteSource.getAddress(); + if (!address.empty()) { + lnk.getSource().setAddress(address); + queue(address).subscribe(sender); + } + } + } + else { + std::string address = lnk.getRemoteTarget().getAddress(); + if (!address.empty()) + lnk.getTarget().setAddress(address); + } + } + + void unsubscribe (Sender &lnk) { + std::string address = lnk.getSource().getAddress(); + QMap::iterator it = queues.find(address); + if (it != queues.end() && it->second->unsubscribe(lnk)) { + delete it->second; + queues.erase(it); + } + } + + void onLinkClosing(Event &e) { + Link lnk = e.getLink(); + if (lnk.isSender()) { + Sender s(lnk); + unsubscribe(s); + } + } + + void onConnectionClosing(Event &e) { + removeStaleConsumers(e.getConnection()); + } + + void onDisconnected(Event &e) { + removeStaleConsumers(e.getConnection()); + } + + void removeStaleConsumers(Connection &connection) { + Link l = connection.getLinkHead(Endpoint::REMOTE_ACTIVE); + while (l) { + if (l.isSender()) { + Sender s(l); + unsubscribe(s); + } + l = l.getNext(Endpoint::REMOTE_ACTIVE); + } + } + + void onSendable(Event &e) { + Link lnk = e.getLink(); + Sender sender(lnk); + std::string addr = lnk.getSource().getAddress(); + queue(addr).dispatch(&sender); + } + + void onMessage(Event &e) { + std::string addr = e.getLink().getTarget().getAddress(); + Message msg = e.getMessage(); + queue(addr).publish(msg); + } +}; + +int main(int argc, char **argv) { + Broker hw("localhost:5672"); + Container(hw).run(); +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/include/proton/cpp/Connection.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Connection.h b/proton-c/bindings/cpp/include/proton/cpp/Connection.h index 7d97ebb..86abbe6 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Connection.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Connection.h @@ -57,6 +57,7 @@ class Connection : public Endpoint, public Handle<ConnectionImpl> PROTON_CPP_EXTERN Container &getContainer(); PROTON_CPP_EXTERN std::string getHostname(); virtual PROTON_CPP_EXTERN Connection &getConnection(); + PROTON_CPP_EXTERN Link getLinkHead(Endpoint::State mask); private: friend class PrivateImplRef<Connection>; friend class Connector; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h b/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h index 9992eff..fc1a712 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Endpoint.h @@ -22,6 +22,7 @@ * */ #include "proton/cpp/ImportExport.h" +#include "proton/connection.h" namespace proton { namespace reactor { @@ -33,6 +34,16 @@ class Transport; class Endpoint { public: + enum { + LOCAL_UNINIT = PN_LOCAL_UNINIT, + REMOTE_UNINIT = PN_REMOTE_UNINIT, + LOCAL_ACTIVE = PN_LOCAL_ACTIVE, + REMOTE_ACTIVE = PN_REMOTE_ACTIVE, + LOCAL_CLOSED = PN_LOCAL_CLOSED, + REMOTE_CLOSED = PN_REMOTE_CLOSED + }; + typedef int State; + // TODO: getCondition, getRemoteCondition, updateCondition, get/setHandler virtual PROTON_CPP_EXTERN Connection &getConnection() = 0; Transport PROTON_CPP_EXTERN &getTransport(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/include/proton/cpp/Handle.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Handle.h b/proton-c/bindings/cpp/include/proton/cpp/Handle.h index 632e30e..77b7814 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Handle.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Handle.h @@ -51,6 +51,10 @@ template <class T> class Handle { /** Operator ! supports idiom if (!handle) { do_if_handle_is_null(); } */ PROTON_CPP_INLINE_EXTERN bool operator !() const { return !impl; } + /** Operator == equal if they point to same non-null object*/ + PROTON_CPP_INLINE_EXTERN bool operator ==(const Handle<T>& other) const { return impl == other.impl; } + PROTON_CPP_INLINE_EXTERN bool operator !=(const Handle<T>& other) const { return impl != other.impl; } + void swap(Handle<T>& h) { T* t = h.impl; h.impl = impl; impl = t; } private: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/include/proton/cpp/Link.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Link.h b/proton-c/bindings/cpp/include/proton/cpp/Link.h index 21b1ca2..265d80d 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Link.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Link.h @@ -24,6 +24,7 @@ #include "proton/cpp/ImportExport.h" #include "proton/cpp/ProtonHandle.h" #include "proton/cpp/Endpoint.h" +#include "proton/cpp/Terminus.h" #include "proton/types.h" #include <string> @@ -45,8 +46,13 @@ class Link : public Endpoint, public ProtonHandle<pn_link_t> PROTON_CPP_EXTERN bool isSender(); PROTON_CPP_EXTERN bool isReceiver(); PROTON_CPP_EXTERN int getCredit(); + PROTON_CPP_EXTERN Terminus getSource(); + PROTON_CPP_EXTERN Terminus getTarget(); + PROTON_CPP_EXTERN Terminus getRemoteSource(); + PROTON_CPP_EXTERN Terminus getRemoteTarget(); PROTON_CPP_EXTERN pn_link_t *getPnLink() const; virtual PROTON_CPP_EXTERN Connection &getConnection(); + PROTON_CPP_EXTERN Link getNext(Endpoint::State mask); protected: virtual void verifyType(pn_link_t *l); private: http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/include/proton/cpp/Receiver.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Receiver.h b/proton-c/bindings/cpp/include/proton/cpp/Receiver.h index 197cfb1..c904dc9 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Receiver.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Receiver.h @@ -37,6 +37,7 @@ class Receiver : public Link public: PROTON_CPP_EXTERN Receiver(pn_link_t *lnk); PROTON_CPP_EXTERN Receiver(); + PROTON_CPP_EXTERN Receiver(const Link& c); protected: virtual void verifyType(pn_link_t *l); }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/include/proton/cpp/Sender.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Sender.h b/proton-c/bindings/cpp/include/proton/cpp/Sender.h index fa8cce8..9b8683d 100644 --- a/proton-c/bindings/cpp/include/proton/cpp/Sender.h +++ b/proton-c/bindings/cpp/include/proton/cpp/Sender.h @@ -39,6 +39,7 @@ class Sender : public Link public: PROTON_CPP_EXTERN Sender(pn_link_t *lnk); PROTON_CPP_EXTERN Sender(); + PROTON_CPP_EXTERN Sender(const Link& c); PROTON_CPP_EXTERN void send(Message &m); protected: virtual void verifyType(pn_link_t *l); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/include/proton/cpp/Terminus.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/cpp/Terminus.h b/proton-c/bindings/cpp/include/proton/cpp/Terminus.h new file mode 100644 index 0000000..092fcc1 --- /dev/null +++ b/proton-c/bindings/cpp/include/proton/cpp/Terminus.h @@ -0,0 +1,81 @@ +#ifndef PROTON_CPP_TERMINUS_H +#define PROTON_CPP_TERMINUS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/Link.h" + +#include "proton/link.h" +#include <string> + +namespace proton { +namespace reactor { + +class Link; + +class Terminus : public ProtonHandle<pn_terminus_t> +{ + enum Type { + TYPE_UNSPECIFIED = PN_UNSPECIFIED, + SOURCE = PN_SOURCE, + TARGET = PN_TARGET, + COORDINATOR = PN_COORDINATOR + }; + enum ExpiryPolicy { + NONDURABLE = PN_NONDURABLE, + CONFIGURATION = PN_CONFIGURATION, + DELIVERIES = PN_DELIVERIES + }; + enum DistributionMode { + MODE_UNSPECIFIED = PN_DIST_MODE_UNSPECIFIED, + COPY = PN_DIST_MODE_COPY, + MOVE = PN_DIST_MODE_MOVE + }; + + public: + PROTON_CPP_EXTERN Terminus(); + PROTON_CPP_EXTERN ~Terminus(); + PROTON_CPP_EXTERN Terminus(const Terminus&); + PROTON_CPP_EXTERN Terminus& operator=(const Terminus&); + PROTON_CPP_EXTERN pn_terminus_t *getPnTerminus(); + PROTON_CPP_EXTERN Type getType(); + PROTON_CPP_EXTERN void setType(Type); + PROTON_CPP_EXTERN ExpiryPolicy getExpiryPolicy(); + PROTON_CPP_EXTERN void setExpiryPolicy(ExpiryPolicy); + PROTON_CPP_EXTERN DistributionMode getDistributionMode(); + PROTON_CPP_EXTERN void setDistributionMode(DistributionMode); + PROTON_CPP_EXTERN std::string getAddress(); + PROTON_CPP_EXTERN void setAddress(std::string &); + PROTON_CPP_EXTERN bool isDynamic(); + PROTON_CPP_EXTERN void setDynamic(bool); + + private: + Link *link; + PROTON_CPP_EXTERN Terminus(pn_terminus_t *, Link *); + friend class Link; + friend class ProtonImplRef<Terminus>; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_TERMINUS_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/src/Connection.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Connection.cpp b/proton-c/bindings/cpp/src/Connection.cpp index 49d171e..1db8fbc 100644 --- a/proton-c/bindings/cpp/src/Connection.cpp +++ b/proton-c/bindings/cpp/src/Connection.cpp @@ -66,4 +66,8 @@ Connection &Connection::getConnection() { Container &Connection::getContainer() { return impl->getContainer(); } +Link Connection::getLinkHead(Endpoint::State mask) { + return impl->getLinkHead(mask); +} + }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/src/ConnectionImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.cpp b/proton-c/bindings/cpp/src/ConnectionImpl.cpp index be01f8d..9cadffe 100644 --- a/proton-c/bindings/cpp/src/ConnectionImpl.cpp +++ b/proton-c/bindings/cpp/src/ConnectionImpl.cpp @@ -41,10 +41,12 @@ void ConnectionImpl::decref(ConnectionImpl *impl) { delete impl; } -ConnectionImpl::ConnectionImpl(Container &c) : container(c), refCount(0), override(0), transport(0), defaultSession(0), - pnConnection(pn_reactor_connection(container.getReactor(), NULL)), +ConnectionImpl::ConnectionImpl(Container &c, pn_connection_t *pnConn) : container(c), refCount(0), override(0), transport(0), defaultSession(0), + pnConnection(pnConn), reactorReference(this) { + if (!pnConnection) + pnConnection = pn_reactor_connection(container.getReactor(), NULL); setConnectionContext(pnConnection, this); } @@ -110,12 +112,14 @@ Connection &ConnectionImpl::getReactorReference(pn_connection_t *conn) { Container container(getContainerContext(reactor)); if (!container) // can't be one created by our container throw ProtonException(MSG("Unknown Proton connection specifier")); - Connection connection(container); - impl = connection.impl; - setConnectionContext(conn, impl); - impl->reactorReference = connection; + impl = new ConnectionImpl(container, conn); } return impl->reactorReference; } +Link ConnectionImpl::getLinkHead(Endpoint::State mask) { + return Link(pn_link_head(pnConnection, mask)); +} + + }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/src/ConnectionImpl.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ConnectionImpl.h b/proton-c/bindings/cpp/src/ConnectionImpl.h index ad8d71e..11b5765 100644 --- a/proton-c/bindings/cpp/src/ConnectionImpl.h +++ b/proton-c/bindings/cpp/src/ConnectionImpl.h @@ -39,7 +39,7 @@ class Container; class ConnectionImpl : public Endpoint { public: - PROTON_CPP_EXTERN ConnectionImpl(Container &c); + PROTON_CPP_EXTERN ConnectionImpl(Container &c, pn_connection_t *pnConn = 0); PROTON_CPP_EXTERN ~ConnectionImpl(); PROTON_CPP_EXTERN Transport &getTransport(); PROTON_CPP_EXTERN Handler *getOverride(); @@ -49,6 +49,7 @@ class ConnectionImpl : public Endpoint PROTON_CPP_EXTERN pn_connection_t *getPnConnection(); PROTON_CPP_EXTERN Container &getContainer(); PROTON_CPP_EXTERN std::string getHostname(); + PROTON_CPP_EXTERN Link getLinkHead(Endpoint::State mask); virtual PROTON_CPP_EXTERN Connection &getConnection(); static Connection &getReactorReference(pn_connection_t *); static ConnectionImpl *getImpl(const Connection &c) { return c.impl; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/src/ContainerImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp index d1339f0..1cabf6c 100644 --- a/proton-c/bindings/cpp/src/ContainerImpl.cpp +++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp @@ -122,6 +122,8 @@ class OverrideHandler : public Handler ConnectionImpl *cimpl = getConnectionContext(conn); if (cimpl) cimpl->reactorDetach(); + // TODO: remember all connections and do reactorDetach of zombies connections + // not pn_connection_release'd at PN_REACTOR_FINAL. } } }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/src/Link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Link.cpp b/proton-c/bindings/cpp/src/Link.cpp index 3356b38..aab01d9 100644 --- a/proton-c/bindings/cpp/src/Link.cpp +++ b/proton-c/bindings/cpp/src/Link.cpp @@ -33,12 +33,6 @@ namespace proton { namespace reactor { -namespace { - -static inline void throwIfNull(pn_link_t *l) { if (!l) throw ProtonException(MSG("Disassociated link")); } - -} - template class ProtonHandle<pn_link_t>; typedef ProtonImplRef<Link> PI; @@ -67,12 +61,10 @@ void Link::verifyType(pn_link_t *l) {} // Generic link can be sender or receiver pn_link_t *Link::getPnLink() const { return impl; } void Link::open() { - throwIfNull(impl); pn_link_open(impl); } void Link::close() { - throwIfNull(impl); pn_link_close(impl); } @@ -85,15 +77,34 @@ bool Link::isReceiver() { } int Link::getCredit() { - throwIfNull(impl); return pn_link_credit(impl); } +Terminus Link::getSource() { + return Terminus(pn_link_source(impl), this); +} + +Terminus Link::getTarget() { + return Terminus(pn_link_target(impl), this); +} + +Terminus Link::getRemoteSource() { + return Terminus(pn_link_remote_source(impl), this); +} + +Terminus Link::getRemoteTarget() { + return Terminus(pn_link_remote_target(impl), this); +} + Connection &Link::getConnection() { - throwIfNull(impl); pn_session_t *s = pn_link_session(impl); pn_connection_t *c = pn_session_connection(s); return ConnectionImpl::getReactorReference(c); } +Link Link::getNext(Endpoint::State mask) { + + return Link(pn_link_next(impl, (pn_state_t) mask)); +} + }} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/src/Receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Receiver.cpp b/proton-c/bindings/cpp/src/Receiver.cpp index 557e736..ad9a6d1 100644 --- a/proton-c/bindings/cpp/src/Receiver.cpp +++ b/proton-c/bindings/cpp/src/Receiver.cpp @@ -34,6 +34,8 @@ namespace reactor { Receiver::Receiver(pn_link_t *lnk) : Link(lnk) {} Receiver::Receiver() : Link(0) {} +Receiver::Receiver(const Link& c) : Link(c.getPnLink()) {} + void Receiver::verifyType(pn_link_t *lnk) { if (lnk && pn_link_is_sender(lnk)) throw ProtonException(MSG("Creating receiver with sender context")); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/src/Sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Sender.cpp b/proton-c/bindings/cpp/src/Sender.cpp index 74d4b0f..c521ad1 100644 --- a/proton-c/bindings/cpp/src/Sender.cpp +++ b/proton-c/bindings/cpp/src/Sender.cpp @@ -46,6 +46,9 @@ void Sender::verifyType(pn_link_t *lnk) { throw ProtonException(MSG("Creating sender with receiver context")); } +Sender::Sender(const Link& c) : Link(c.getPnLink()) {} + + namespace{ // revisit if thread safety required uint64_t tagCounter = 0; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e8acc1e5/proton-c/bindings/cpp/src/Terminus.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Terminus.cpp b/proton-c/bindings/cpp/src/Terminus.cpp new file mode 100644 index 0000000..f66979e --- /dev/null +++ b/proton-c/bindings/cpp/src/Terminus.cpp @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Link.h" +#include "proton/link.h" + +namespace proton { +namespace reactor { + +template class ProtonHandle<pn_terminus_t>; +typedef ProtonImplRef<Terminus> PI; + +// Note: the pn_terminus_t is not ref counted. We count the parent link. + +Terminus::Terminus() : link(0) { + impl = 0; +} + +Terminus::Terminus(pn_terminus_t *p, Link *l) : link(l) { + impl = p; + pn_incref(link->getPnLink()); +} +Terminus::Terminus(const Terminus& c) : ProtonHandle<pn_terminus_t>() { + impl = c.impl; + link = c.link; + pn_incref(link->getPnLink()); +} +Terminus& Terminus::operator=(const Terminus& c) { + if (impl == c.impl) return *this; + if (impl) pn_decref(link->getPnLink()); + impl = c.impl; + link = c.link; + pn_incref(link->getPnLink()); + return *this; +} +Terminus::~Terminus() { + if (impl) + pn_decref(link->getPnLink()); +} + +pn_terminus_t *Terminus::getPnTerminus() { return impl; } + +Terminus::Type Terminus::getType() { + return (Type) pn_terminus_get_type(impl); +} + +void Terminus::setType(Type type) { + pn_terminus_set_type(impl, (pn_terminus_type_t) type); +} + +Terminus::ExpiryPolicy Terminus::getExpiryPolicy() { + return (ExpiryPolicy) pn_terminus_get_type(impl); +} + +void Terminus::setExpiryPolicy(ExpiryPolicy policy) { + pn_terminus_set_expiry_policy(impl, (pn_expiry_policy_t) policy); +} + +Terminus::DistributionMode Terminus::getDistributionMode() { + return (DistributionMode) pn_terminus_get_type(impl); +} + +void Terminus::setDistributionMode(DistributionMode mode) { + pn_terminus_set_distribution_mode(impl, (pn_distribution_mode_t) mode); +} + +std::string Terminus::getAddress() { + const char *addr = pn_terminus_get_address(impl); + return addr ? std::string(addr) : std::string(); +} + +void Terminus::setAddress(std::string &addr) { + pn_terminus_set_address(impl, addr.c_str()); +} + +bool Terminus::isDynamic() { + return (Type) pn_terminus_is_dynamic(impl); +} + +void Terminus::setDynamic(bool d) { + pn_terminus_set_dynamic(impl, d); +} + +}} // namespace proton::reactor --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
