http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/ContainerImpl.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.cpp b/proton-c/bindings/cpp/src/ContainerImpl.cpp new file mode 100644 index 0000000..7ff9c1d --- /dev/null +++ b/proton-c/bindings/cpp/src/ContainerImpl.cpp @@ -0,0 +1,301 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Container.h" +#include "proton/cpp/MessagingEvent.h" +#include "proton/cpp/Connection.h" +#include "proton/cpp/Session.h" +#include "proton/cpp/MessagingAdapter.h" +#include "proton/cpp/Acceptor.h" +#include "proton/cpp/exceptions.h" +#include "LogInternal.h" + +#include "ContainerImpl.h" +#include "ConnectionImpl.h" +#include "Connector.h" +#include "contexts.h" +#include "Url.h" +#include "platform.h" +#include "PrivateImplRef.h" + +#include "proton/connection.h" +#include "proton/session.h" + +namespace proton { +namespace reactor { + +namespace { + +ConnectionImpl *getImpl(const Connection &c) { + return PrivateImplRef<Connection>::get(c); +} + +ContainerImpl *getImpl(const Container &c) { + return PrivateImplRef<Container>::get(c); +} + +} // namespace + + +class CHandler : public Handler +{ + public: + CHandler(pn_handler_t *h) : pnHandler(h) { + pn_incref(pnHandler); + } + ~CHandler() { + pn_decref(pnHandler); + } + pn_handler_t *getPnHandler() { return pnHandler; } + private: + pn_handler_t *pnHandler; +}; + + +void dispatch(Handler &h, MessagingEvent &e) { + // TODO: also dispatch to add()'ed Handlers + CHandler *chandler; + int type = e.getType(); + if (type && (chandler = dynamic_cast<CHandler*>(&h))) { + // event and handler are both native Proton C + pn_handler_dispatch(chandler->getPnHandler(), e.getPnEvent(), (pn_event_type_t) type); + } + else + e.dispatch(h); +} + +// Used to sniff for Connector events before the reactor's global handler sees them. +class OverrideHandler : public Handler +{ + public: + pn_handler_t *baseHandler; + + OverrideHandler(pn_handler_t *h) : baseHandler(h) { + pn_incref(baseHandler); + } + ~OverrideHandler() { + pn_decref(baseHandler); + } + + + virtual void onUnhandled(Event &e) { + ProtonEvent *pne = dynamic_cast<ProtonEvent *>(&e); + // If not a Proton reactor event, nothing to override, nothing to pass along. + if (!pne) return; + int type = pne->getType(); + if (!type) return; // Also not from the reactor + + pn_event_t *cevent = pne->getPnEvent(); + pn_connection_t *conn = pn_event_connection(cevent); + if (conn && type != PN_CONNECTION_INIT) { + // send to override handler first + ConnectionImpl *connection = getConnectionContext(conn); + if (connection) { + Handler *override = connection->getOverride(); + if (override) + e.dispatch(*override); + } + } + + pn_handler_dispatch(baseHandler, cevent, (pn_event_type_t) type); + + if (conn && type == PN_CONNECTION_FINAL) { + // TODO: this must be the last acation of the last handler looking at + // connection events. Better: generate a custom FINAL event (or task). Or move to + // separate event streams per connection as part of multi threading support. + ConnectionImpl *cimpl = getConnectionContext(conn); + if (cimpl) + cimpl->reactorDetach(); + } + } +}; + +namespace { + +// TODO: configurable policy. SessionPerConnection for now. +Session getDefaultSession(pn_connection_t *conn, pn_session_t **ses) { + if (!*ses) { + *ses = pn_session(conn); + pn_session_open(*ses); + } + return Session(*ses); +} + + +struct InboundContext { + ContainerImpl *containerImpl; + Container containerRef; // create only once for all inbound events + Handler *cppHandler; +}; + +ContainerImpl *getContainerImpl(pn_handler_t *c_handler) { + struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler); + return ctxt->containerImpl; +} + +Container &getContainerRef(pn_handler_t *c_handler) { + struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler); + return ctxt->containerRef; +} + +Handler &getCppHandler(pn_handler_t *c_handler) { + struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler); + return *ctxt->cppHandler; +} + +void cpp_handler_dispatch(pn_handler_t *c_handler, pn_event_t *cevent, pn_event_type_t type) +{ + MessagingEvent ev(cevent, type, getContainerRef(c_handler)); + dispatch(getCppHandler(c_handler), ev); +} + +void cpp_handler_cleanup(pn_handler_t *c_handler) +{ + struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(c_handler); + ctxt->containerRef.~Container(); +} + +pn_handler_t *cpp_handler(ContainerImpl *c, Handler *h) +{ + pn_handler_t *handler = pn_handler_new(cpp_handler_dispatch, sizeof(struct InboundContext), cpp_handler_cleanup); + struct InboundContext *ctxt = (struct InboundContext *) pn_handler_mem(handler); + ctxt->containerRef = Container(c); + ctxt->containerImpl = c; + ctxt->cppHandler = h; + return handler; +} + + +} // namespace + + +void ContainerImpl::incref(ContainerImpl *impl) { + impl->refCount++; +} + +void ContainerImpl::decref(ContainerImpl *impl) { + impl->refCount--; + if (impl->refCount == 0) + delete impl; +} + +ContainerImpl::ContainerImpl(MessagingHandler &mhandler) : + reactor(0), globalHandler(0), messagingHandler(mhandler), containerId(generateUuid()), + refCount(0) +{ +} + +ContainerImpl::~ContainerImpl() {} + +Connection ContainerImpl::connect(std::string &host) { + if (!reactor) throw ProtonException(MSG("Container not initialized")); + Container cntnr(this); + Connection connection(cntnr); + Connector *connector = new Connector(connection); + // Connector self-deletes depending on reconnect logic + connector->setAddress(host); // TODO: url vector + connection.setOverride(connector); + connection.open(); + return connection; +} + +pn_reactor_t *ContainerImpl::getReactor() { return reactor; } + +pn_handler_t *ContainerImpl::getGlobalHandler() { return globalHandler; } + +std::string ContainerImpl::getContainerId() { return containerId; } + + +Sender ContainerImpl::createSender(Connection &connection, std::string &addr) { + Session session = getDefaultSession(connection.getPnConnection(), &getImpl(connection)->defaultSession); + Sender snd = session.createSender(containerId + '-' + addr); + pn_terminus_set_address(pn_link_target(snd.getPnLink()), addr.c_str()); + snd.open(); + + ConnectionImpl *connImpl = getImpl(connection); + return snd; +} + +Sender ContainerImpl::createSender(std::string &urlString) { + Connection conn = connect(urlString); + Session session = getDefaultSession(conn.getPnConnection(), &getImpl(conn)->defaultSession); + std::string path = Url(urlString).getPath(); + Sender snd = session.createSender(containerId + '-' + path); + pn_terminus_set_address(pn_link_target(snd.getPnLink()), path.c_str()); + snd.open(); + + ConnectionImpl *connImpl = getImpl(conn); + return snd; +} + +Receiver ContainerImpl::createReceiver(Connection &connection, std::string &addr) { + ConnectionImpl *connImpl = getImpl(connection); + Session session = getDefaultSession(connImpl->pnConnection, &connImpl->defaultSession); + Receiver rcv = session.createReceiver(containerId + '-' + addr); + pn_terminus_set_address(pn_link_source(rcv.getPnLink()), addr.c_str()); + rcv.open(); + return rcv; +} + +Acceptor ContainerImpl::acceptor(const std::string &host, const std::string &port) { + pn_acceptor_t *acptr = pn_reactor_acceptor(reactor, host.c_str(), port.c_str(), NULL); + if (acptr) + return Acceptor(acptr); + else + throw ProtonException(MSG("accept fail: " << pn_error_text(pn_io_error(pn_reactor_io(reactor))) << "(" << host << ":" << port << ")")); +} + +Acceptor ContainerImpl::listen(const std::string &urlString) { + Url url(urlString); + // TODO: SSL + return acceptor(url.getHost(), url.getPort()); +} + + +void ContainerImpl::run() { + reactor = pn_reactor(); + // Set our context on the reactor + setContainerContext(reactor, this); + + // Set the reactor's main/default handler (see note below) + MessagingAdapter messagingAdapter(messagingHandler); + messagingHandler.addChildHandler(messagingAdapter); + pn_handler_t *cppHandler = cpp_handler(this, &messagingHandler); + pn_reactor_set_handler(reactor, cppHandler); + + // Set our own global handler that "subclasses" the existing one + pn_handler_t *cGlobalHandler = pn_reactor_get_global_handler(reactor); + pn_incref(cGlobalHandler); + OverrideHandler overrideHandler(cGlobalHandler); + pn_handler_t *cppGlobalHandler = cpp_handler(this, &overrideHandler); + pn_reactor_set_global_handler(reactor, cppGlobalHandler); + + // Note: we have just set up the following 4 handlers that see events in this order: + // messagingHandler, messagingAdapter, connector override, the reactor's default global + // handler (pn_iohandler) + // TODO: remove fifth pn_handshaker once messagingAdapter matures + + pn_reactor_run(reactor); + pn_decref(cGlobalHandler); + pn_reactor_free(reactor); + reactor = 0; +} + +}} // namespace proton::reactor
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/ContainerImpl.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ContainerImpl.h b/proton-c/bindings/cpp/src/ContainerImpl.h new file mode 100644 index 0000000..a14bf52 --- /dev/null +++ b/proton-c/bindings/cpp/src/ContainerImpl.h @@ -0,0 +1,69 @@ +#ifndef PROTON_CPP_CONTAINERIMPL_H +#define PROTON_CPP_CONTAINERIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/Connection.h" +#include "proton/cpp/Link.h" + +#include "proton/reactor.h" + +#include <string> +namespace proton { +namespace reactor { + +class DispatchHelper; +class Connection; +class Connector; +class Acceptor; + +class ContainerImpl +{ + public: + PROTON_CPP_EXTERN ContainerImpl(MessagingHandler &mhandler); + PROTON_CPP_EXTERN ~ContainerImpl(); + PROTON_CPP_EXTERN Connection connect(std::string &host); + PROTON_CPP_EXTERN void run(); + PROTON_CPP_EXTERN pn_reactor_t *getReactor(); + PROTON_CPP_EXTERN pn_handler_t *getGlobalHandler(); + PROTON_CPP_EXTERN Sender createSender(Connection &connection, std::string &addr); + PROTON_CPP_EXTERN Sender createSender(std::string &url); + PROTON_CPP_EXTERN Receiver createReceiver(Connection &connection, std::string &addr); + PROTON_CPP_EXTERN Acceptor listen(const std::string &url); + PROTON_CPP_EXTERN std::string getContainerId(); + static void incref(ContainerImpl *); + static void decref(ContainerImpl *); + private: + void dispatch(pn_event_t *event, pn_event_type_t type); + Acceptor acceptor(const std::string &host, const std::string &port); + pn_reactor_t *reactor; + pn_handler_t *globalHandler; + MessagingHandler &messagingHandler; + std::string containerId; + int refCount; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_CONTAINERIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Endpoint.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Endpoint.cpp b/proton-c/bindings/cpp/src/Endpoint.cpp new file mode 100644 index 0000000..868f361 --- /dev/null +++ b/proton-c/bindings/cpp/src/Endpoint.cpp @@ -0,0 +1,37 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Endpoint.h" +#include "proton/cpp/Connection.h" +#include "proton/cpp/Transport.h" + +namespace proton { +namespace reactor { + +Endpoint::Endpoint() {} + +Endpoint::~Endpoint() {} + +Transport &Endpoint::getTransport() { + return getConnection().getTransport(); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Event.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Event.cpp b/proton-c/bindings/cpp/src/Event.cpp new file mode 100644 index 0000000..531c764 --- /dev/null +++ b/proton-c/bindings/cpp/src/Event.cpp @@ -0,0 +1,71 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/reactor.h" +#include "proton/event.h" + +#include "proton/cpp/Event.h" +#include "proton/cpp/Handler.h" +#include "proton/cpp/exceptions.h" + +#include "Msg.h" +#include "contexts.h" + +namespace proton { +namespace reactor { + +Event::Event() {} + +Event::~Event() {} + + +Container &Event::getContainer() { + // Subclasses to override as appropriate + throw ProtonException(MSG("No container context for event")); +} + +Connection &Event::getConnection() { + throw ProtonException(MSG("No connection context for Event")); +} + +Sender Event::getSender() { + throw ProtonException(MSG("No Sender context for event")); +} + +Receiver Event::getReceiver() { + throw ProtonException(MSG("No Receiver context for event")); +} + +Link Event::getLink() { + throw ProtonException(MSG("No Link context for event")); +} + +Message Event::getMessage() { + throw ProtonException(MSG("No message associated with event")); +} + +void Event::setMessage(Message &) { + throw ProtonException(MSG("Operation not supported for this type of event")); +} + + + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Handler.cpp b/proton-c/bindings/cpp/src/Handler.cpp new file mode 100644 index 0000000..4d1b581 --- /dev/null +++ b/proton-c/bindings/cpp/src/Handler.cpp @@ -0,0 +1,44 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Handler.h" +#include "proton/cpp/Event.h" + +namespace proton { +namespace reactor { + +Handler::Handler(){}; +Handler::~Handler(){}; + +void Handler::onUnhandled(Event &e){}; + +void Handler::addChildHandler(Handler &e) { + childHandlers.push_back(&e); +} + +std::vector<Handler *>::iterator Handler::childHandlersBegin() { + return childHandlers.begin(); +} + +std::vector<Handler *>::iterator Handler::childHandlersEnd() { + return childHandlers.end(); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Link.cpp b/proton-c/bindings/cpp/src/Link.cpp new file mode 100644 index 0000000..3356b38 --- /dev/null +++ b/proton-c/bindings/cpp/src/Link.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Link.h" +#include "proton/cpp/exceptions.h" +#include "proton/cpp/Connection.h" +#include "ConnectionImpl.h" +#include "Msg.h" +#include "contexts.h" +#include "ProtonImplRef.h" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/link.h" + +namespace proton { +namespace reactor { + +namespace { + +static inline void throwIfNull(pn_link_t *l) { if (!l) throw ProtonException(MSG("Disassociated link")); } + +} + +template class ProtonHandle<pn_link_t>; +typedef ProtonImplRef<Link> PI; + +Link::Link(pn_link_t* p) { + verifyType(p); + PI::ctor(*this, p); + if (p) senderLink = pn_link_is_sender(p); +} +Link::Link() { + PI::ctor(*this, 0); +} +Link::Link(const Link& c) : ProtonHandle<pn_link_t>() { + verifyType(impl); + PI::copy(*this, c); + senderLink = c.senderLink; +} +Link& Link::operator=(const Link& c) { + verifyType(impl); + senderLink = c.senderLink; + return PI::assign(*this, c); +} +Link::~Link() { PI::dtor(*this); } + +void Link::verifyType(pn_link_t *l) {} // Generic link can be sender or receiver + +pn_link_t *Link::getPnLink() const { return impl; } + +void Link::open() { + throwIfNull(impl); + pn_link_open(impl); +} + +void Link::close() { + throwIfNull(impl); + pn_link_close(impl); +} + +bool Link::isSender() { + return impl && senderLink; +} + +bool Link::isReceiver() { + return impl && !senderLink; +} + +int Link::getCredit() { + throwIfNull(impl); + return pn_link_credit(impl); +} + +Connection &Link::getConnection() { + throwIfNull(impl); + pn_session_t *s = pn_link_session(impl); + pn_connection_t *c = pn_session_connection(s); + return ConnectionImpl::getReactorReference(c); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/LogInternal.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/LogInternal.h b/proton-c/bindings/cpp/src/LogInternal.h new file mode 100644 index 0000000..427f90b --- /dev/null +++ b/proton-c/bindings/cpp/src/LogInternal.h @@ -0,0 +1,51 @@ +#ifndef PROTON_CPP_LOG_INTERNAL_H +#define PROTON_CPP_LOG_INTERNAL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "Msg.h" + +namespace proton { +namespace reactor { + +enum Level { trace, debug, info, notice, warning, error, critical }; + +class Logger +{ +public: + // TODO: build out to be ultra configurable as for corresponding QPID class + Statement + static void log(Level level, const char* file, int line, const char* function, const std::string& message); +private: + //This class has only one instance so no need to copy + Logger(); + ~Logger(); + + Logger(const Logger&); + Logger operator=(const Logger&); +}; + +// Just do simple logging for now +#define PN_CPP_LOG(LEVEL, MESSAGE) Logger::log(LEVEL, 0, 0, 0, ::proton::reactor::Msg() << MESSAGE) + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_LOG_INTERNAL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Logger.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Logger.cpp b/proton-c/bindings/cpp/src/Logger.cpp new file mode 100644 index 0000000..2671b2e --- /dev/null +++ b/proton-c/bindings/cpp/src/Logger.cpp @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "LogInternal.h" +#include <cstdlib> + +namespace proton { +namespace reactor { + +namespace { +bool levelSet = false; +Level logLevel = error; +Level getLogLevel() { + if (!levelSet) { + levelSet = true; + const char *l = getenv("PROTON_CPP_LOG_LEVEL"); + if (l && l[0] != 0 && l[1] == 0) { + char low = '0' + trace; + char high = '0' + critical; + if (*l >= low && *l <= high) + logLevel = (Level) (*l - '0'); + } + } + return logLevel; +} + +} // namespace + + +void Logger::log(Level level, const char* file, int line, const char* function, const std::string& message) +{ + if (level >= getLogLevel()) { + std::cout << message << std::endl; + std::cout.flush(); + } +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Message.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Message.cpp b/proton-c/bindings/cpp/src/Message.cpp new file mode 100644 index 0000000..840a10b --- /dev/null +++ b/proton-c/bindings/cpp/src/Message.cpp @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/Message.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" + +namespace proton { +namespace reactor { + +Message::Message() : pnMessage(pn_message()){} + +Message::~Message() { + pn_decref(pnMessage); +} + +Message::Message(const Message& m) : pnMessage(m.pnMessage) { + pn_incref(pnMessage); +} + +Message& Message::operator=(const Message& m) { + pnMessage = m.pnMessage; + pn_incref(pnMessage); + return *this; +} + +void Message::setBody(const std::string &buf) { + pn_data_t *body = pn_message_body(pnMessage); + pn_data_put_string(body, pn_bytes(buf.size(), buf.data())); +} + +std::string Message::getBody() { + pn_data_t *body = pn_message_body(pnMessage); + if (pn_data_next(body) && pn_data_type(body) == PN_STRING) { + pn_bytes_t bytes= pn_data_get_string(body); + if (!pn_data_next(body)) { + // String data and nothing else + return std::string(bytes.start, bytes.size); + } + } + + pn_data_rewind(body); + std::string str; + size_t sz = 1024; + str.resize(sz); + int err = pn_data_format(body, (char *) str.data(), &sz); + if (err == PN_OVERFLOW) + throw ProtonException(MSG("TODO: sizing loop missing")); + if (err) throw ProtonException(MSG("Unexpected data error")); + str.resize(sz); + return str; +} + +void Message::encode(std::string &s) { + size_t sz = 1024; + if (s.capacity() > sz) + sz = s.capacity(); + else + s.reserve(sz); + s.resize(sz); + int err = pn_message_encode(pnMessage, (char *) s.data(), &sz); + if (err == PN_OVERFLOW) + throw ProtonException(MSG("TODO: fix overflow with dynamic buffer resizing")); + if (err) throw ProtonException(MSG("unexpected error")); + s.resize(sz); +} + +void Message::decode(const std::string &s) { + int err = pn_message_decode(pnMessage, s.data(), s.size()); + if (err) throw ProtonException(MSG("unexpected error")); +} + + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/MessagingAdapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingAdapter.cpp b/proton-c/bindings/cpp/src/MessagingAdapter.cpp new file mode 100644 index 0000000..9cab2b3 --- /dev/null +++ b/proton-c/bindings/cpp/src/MessagingAdapter.cpp @@ -0,0 +1,191 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/MessagingAdapter.h" +#include "proton/cpp/MessagingEvent.h" +#include "proton/cpp/Sender.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" + +#include "proton/link.h" +#include "proton/handlers.h" +#include "proton/delivery.h" +#include "proton/connection.h" + +namespace proton { +namespace reactor { + +MessagingAdapter::MessagingAdapter(MessagingHandler &d) : delegate(d), handshaker(pn_handshaker()) { + pn_handler_t *flowcontroller = pn_flowcontroller(10); + pn_handler_add(handshaker, flowcontroller); + pn_decref(flowcontroller); +}; +MessagingAdapter::~MessagingAdapter(){ + pn_decref(handshaker); +}; + +void MessagingAdapter::onReactorInit(Event &e) { + // create onStart extended event + MessagingEvent mevent(PN_MESSAGING_START, NULL, e.getContainer()); + mevent.dispatch(delegate); +} + +void MessagingAdapter::onLinkFlow(Event &e) { + ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e); + if (pe) { + pn_event_t *pne = pe->getPnEvent(); + pn_link_t *lnk = pn_event_link(pne); + if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0) { + // create onMessage extended event + MessagingEvent mevent(PN_MESSAGING_SENDABLE, pe, e.getContainer()); + mevent.dispatch(delegate); + } + } +} + +namespace { +Message receiveMessage(pn_link_t *lnk, pn_delivery_t *dlv) { + std::string buf; + size_t sz = pn_delivery_pending(dlv); + buf.resize(sz); + ssize_t n = pn_link_recv(lnk, (char *) buf.data(), sz); + if (n != (ssize_t) sz) + throw ProtonException(MSG("link read failure")); + Message m; + m. decode(buf); + pn_link_advance(lnk); + return m; +} +} // namespace + +void MessagingAdapter::onDelivery(Event &e) { + ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e); + if (pe) { + pn_event_t *cevent = pe->getPnEvent(); + pn_link_t *lnk = pn_event_link(cevent); + pn_delivery_t *dlv = pn_event_delivery(cevent); + + if (pn_link_is_receiver(lnk)) { + if (!pn_delivery_partial(dlv) && pn_delivery_readable(dlv)) { + // generate onMessage + MessagingEvent mevent(PN_MESSAGING_MESSAGE, pe, pe->getContainer()); + Message m(receiveMessage(lnk, dlv)); + mevent.setMessage(m); + // TODO: check if endpoint closed... + mevent.dispatch(delegate); + // only do auto accept for now + pn_delivery_update(dlv, PN_ACCEPTED); + pn_delivery_settle(dlv); + // TODO: generate onSettled + } + } else { + // Sender + if (pn_delivery_updated(dlv)) { + uint64_t rstate = pn_delivery_remote_state(dlv); + if (rstate == PN_ACCEPTED) + // generate onAccepted + MessagingEvent(PN_MESSAGING_ACCEPTED, pe, pe->getContainer()).dispatch(delegate); + else if (rstate = PN_REJECTED) + MessagingEvent(PN_MESSAGING_REJECTED, pe, pe->getContainer()).dispatch(delegate); + else if (rstate == PN_RELEASED || rstate == PN_MODIFIED) + MessagingEvent(PN_MESSAGING_RELEASED, pe, pe->getContainer()).dispatch(delegate); + + if (pn_delivery_settled(dlv)) + MessagingEvent(PN_MESSAGING_SETTLED, pe, pe->getContainer()).dispatch(delegate); + + pn_delivery_settle(dlv); // TODO: only if auto settled + } + } + } +} + +namespace { + +bool isLocalOpen(pn_state_t state) { + return state & PN_LOCAL_ACTIVE; +} + +bool isLocalUnititialised(pn_state_t state) { + return state & PN_LOCAL_UNINIT; +} + +bool isLocalClosed(pn_state_t state) { + return state & PN_LOCAL_CLOSED; +} + +bool isRemoteOpen(pn_state_t state) { + return state & PN_REMOTE_ACTIVE; +} + +bool isRemoteClosed(pn_state_t state) { + return state & PN_REMOTE_CLOSED; +} + +} // namespace + +void MessagingAdapter::onConnectionRemoteClose(Event &e) { + ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e); + if (pe) { + pn_event_t *cevent = pe->getPnEvent(); + pn_connection_t *conn = pn_event_connection(cevent); + // TODO: remote condition -> error + if (isLocalClosed(pn_connection_state(conn))) { + MessagingEvent(PN_MESSAGING_CONNECTION_CLOSED, pe, pe->getContainer()).dispatch(delegate); + } + else { + MessagingEvent(PN_MESSAGING_CONNECTION_CLOSING, pe, pe->getContainer()).dispatch(delegate); + } + pn_connection_close(conn); + } +} + + +void MessagingAdapter::onLinkRemoteOpen(Event &e) { + ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e); + if (pe) { + pn_event_t *cevent = pe->getPnEvent(); + pn_link_t *link = pn_event_link(cevent); + // TODO: remote condition -> error + if (isLocalOpen(pn_link_state(link))) { + MessagingEvent(PN_MESSAGING_LINK_OPENED, pe, pe->getContainer()).dispatch(delegate); + } + else if (isLocalUnititialised(pn_link_state(link))) { + MessagingEvent(PN_MESSAGING_LINK_OPENING, pe, pe->getContainer()).dispatch(delegate); + pn_link_open(link); + } + } +} + + +void MessagingAdapter::onUnhandled(Event &e) { + // Until this code fleshes out closer to python's, cheat a bit with a pn_handshaker + + ProtonEvent *pe = dynamic_cast<ProtonEvent*>(&e); + if (pe) { + pn_event_type_t type = (pn_event_type_t) pe->getType(); + if (type != PN_EVENT_NONE) { + pn_handler_dispatch(handshaker, pe->getPnEvent(), type); + } + } +} + + + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/MessagingEvent.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingEvent.cpp b/proton-c/bindings/cpp/src/MessagingEvent.cpp new file mode 100644 index 0000000..bcfb721 --- /dev/null +++ b/proton-c/bindings/cpp/src/MessagingEvent.cpp @@ -0,0 +1,133 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/reactor.h" +#include "proton/event.h" +#include "proton/link.h" + +#include "proton/cpp/MessagingEvent.h" +#include "proton/cpp/ProtonHandler.h" +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" +#include "contexts.h" + +namespace proton { +namespace reactor { + +MessagingEvent::MessagingEvent(pn_event_t *ce, pn_event_type_t t, Container &c) : + ProtonEvent(ce, t, c), messagingType(PN_MESSAGING_PROTON), parentEvent(0), message(0) +{} + +MessagingEvent::MessagingEvent(MessagingEventType_t t, ProtonEvent *p, Container &c) : + ProtonEvent(NULL, PN_EVENT_NONE, c), messagingType(t), parentEvent(p), message(0) { + if (messagingType == PN_MESSAGING_PROTON) + throw ProtonException(MSG("invalid messaging event type")); +} + +MessagingEvent::~MessagingEvent() { + delete message; +} + +Connection &MessagingEvent::getConnection() { + if (messagingType == PN_MESSAGING_PROTON) + return ProtonEvent::getConnection(); + if (parentEvent) + return parentEvent->getConnection(); + throw ProtonException(MSG("No connection context for event")); +} + +Sender MessagingEvent::getSender() { + if (messagingType == PN_MESSAGING_PROTON) + return ProtonEvent::getSender(); + if (parentEvent) + return parentEvent->getSender(); + throw ProtonException(MSG("No sender context for event")); +} + +Receiver MessagingEvent::getReceiver() { + if (messagingType == PN_MESSAGING_PROTON) + return ProtonEvent::getReceiver(); + if (parentEvent) + return parentEvent->getReceiver(); + throw ProtonException(MSG("No receiver context for event")); +} + +Link MessagingEvent::getLink() { + if (messagingType == PN_MESSAGING_PROTON) + return ProtonEvent::getLink(); + if (parentEvent) + return parentEvent->getLink(); + throw ProtonException(MSG("No link context for event")); +} + +Message MessagingEvent::getMessage() { + if (message) + return *message; + throw ProtonException(MSG("No message context for event")); +} + +void MessagingEvent::setMessage(Message &m) { + if (messagingType != PN_MESSAGING_MESSAGE) + throw ProtonException(MSG("Event type does not provide message")); + delete message; + message = new Message(m); +} + +void MessagingEvent::dispatch(Handler &h) { + if (messagingType == PN_MESSAGING_PROTON) { + ProtonEvent::dispatch(h); + return; + } + + MessagingHandler *handler = dynamic_cast<MessagingHandler*>(&h); + if (handler) { + switch(messagingType) { + + case PN_MESSAGING_START: handler->onStart(*this); break; + case PN_MESSAGING_SENDABLE: handler->onSendable(*this); break; + case PN_MESSAGING_MESSAGE: handler->onMessage(*this); break; + case PN_MESSAGING_ACCEPTED: handler->onAccepted(*this); break; + case PN_MESSAGING_REJECTED: handler->onRejected(*this); break; + case PN_MESSAGING_RELEASED: handler->onReleased(*this); break; + case PN_MESSAGING_SETTLED: handler->onSettled(*this); break; + + case PN_MESSAGING_CONNECTION_CLOSING: handler->onConnectionClosing(*this); break; + case PN_MESSAGING_CONNECTION_CLOSED: handler->onConnectionClosed(*this); break; + case PN_MESSAGING_LINK_OPENING: handler->onLinkOpening(*this); break; + case PN_MESSAGING_LINK_OPENED: handler->onLinkOpened(*this); break; + + default: + throw ProtonException(MSG("Unkown messaging event type " << messagingType)); + break; + } + } else { + h.onUnhandled(*this); + } + + // recurse through children + for (std::vector<Handler *>::iterator child = h.childHandlersBegin(); + child != h.childHandlersEnd(); ++child) { + dispatch(**child); + } +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/MessagingHandler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/MessagingHandler.cpp b/proton-c/bindings/cpp/src/MessagingHandler.cpp new file mode 100644 index 0000000..7cb48f6 --- /dev/null +++ b/proton-c/bindings/cpp/src/MessagingHandler.cpp @@ -0,0 +1,60 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/MessagingHandler.h" +#include "proton/cpp/Event.h" + +namespace proton { +namespace reactor { + +MessagingHandler::MessagingHandler(){}; +MessagingHandler::~MessagingHandler(){}; + +void MessagingHandler::onAbort(Event &e) { onUnhandled(e); } +void MessagingHandler::onAccepted(Event &e) { onUnhandled(e); } +void MessagingHandler::onCommit(Event &e) { onUnhandled(e); } +void MessagingHandler::onConnectionClose(Event &e) { onUnhandled(e); } +void MessagingHandler::onConnectionClosed(Event &e) { onUnhandled(e); } +void MessagingHandler::onConnectionClosing(Event &e) { onUnhandled(e); } +void MessagingHandler::onConnectionOpen(Event &e) { onUnhandled(e); } +void MessagingHandler::onConnectionOpened(Event &e) { onUnhandled(e); } +void MessagingHandler::onDisconnected(Event &e) { onUnhandled(e); } +void MessagingHandler::onFetch(Event &e) { onUnhandled(e); } +void MessagingHandler::onIdLoaded(Event &e) { onUnhandled(e); } +void MessagingHandler::onLinkClosing(Event &e) { onUnhandled(e); } +void MessagingHandler::onLinkOpened(Event &e) { onUnhandled(e); } +void MessagingHandler::onLinkOpening(Event &e) { onUnhandled(e); } +void MessagingHandler::onMessage(Event &e) { onUnhandled(e); } +void MessagingHandler::onQuit(Event &e) { onUnhandled(e); } +void MessagingHandler::onRecordInserted(Event &e) { onUnhandled(e); } +void MessagingHandler::onRecordsLoaded(Event &e) { onUnhandled(e); } +void MessagingHandler::onRejected(Event &e) { onUnhandled(e); } +void MessagingHandler::onReleased(Event &e) { onUnhandled(e); } +void MessagingHandler::onRequest(Event &e) { onUnhandled(e); } +void MessagingHandler::onResponse(Event &e) { onUnhandled(e); } +void MessagingHandler::onSendable(Event &e) { onUnhandled(e); } +void MessagingHandler::onSettled(Event &e) { onUnhandled(e); } +void MessagingHandler::onStart(Event &e) { onUnhandled(e); } +void MessagingHandler::onTimer(Event &e) { onUnhandled(e); } +void MessagingHandler::onTransactionAborted(Event &e) { onUnhandled(e); } +void MessagingHandler::onTransactionCommitted(Event &e) { onUnhandled(e); } +void MessagingHandler::onTransactionDeclared(Event &e) { onUnhandled(e); } + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Msg.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Msg.h b/proton-c/bindings/cpp/src/Msg.h new file mode 100644 index 0000000..1168d35 --- /dev/null +++ b/proton-c/bindings/cpp/src/Msg.h @@ -0,0 +1,79 @@ +#ifndef PROTON_MSG_H +#define PROTON_MSG_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <sstream> +#include <iostream> + +namespace proton { +namespace reactor { + +/** A simple wrapper for std::ostringstream that allows + * in place construction of a message and automatic conversion + * to string. + * E.g. + *@code + * void foo(const std::string&); + * foo(Msg() << "hello " << 32); + *@endcode + * Will construct the string "hello 32" and pass it to foo() + */ +struct Msg { + std::ostringstream os; + Msg() {} + Msg(const Msg& m) : os(m.str()) {} + std::string str() const { return os.str(); } + operator std::string() const { return str(); } + + Msg& operator<<(long n) { os << n; return *this; } + Msg& operator<<(unsigned long n) { os << n; return *this; } + Msg& operator<<(bool n) { os << n; return *this; } + Msg& operator<<(short n) { os << n; return *this; } + Msg& operator<<(unsigned short n) { os << n; return *this; } + Msg& operator<<(int n) { os << n; return *this; } + Msg& operator<<(unsigned int n) { os << n; return *this; } +#ifdef _GLIBCXX_USE_LONG_LONG + Msg& operator<<(long long n) { os << n; return *this; } + Msg& operator<<(unsigned long long n) { os << n; return *this; } +#endif + Msg& operator<<(double n) { os << n; return *this; } + Msg& operator<<(float n) { os << n; return *this; } + Msg& operator<<(long double n) { os << n; return *this; } + + template <class T> Msg& operator<<(const T& t) { os <<t; return *this; } +}; + + + +inline std::ostream& operator<<(std::ostream& o, const Msg& m) { + return o << m.str(); +} + +/** Construct a message using operator << and append (file:line) */ +#define QUOTE_(x) #x +#define QUOTE(x) QUOTE_(x) +#define MSG(message) (::proton::reactor::Msg() << message << " (" __FILE__ ":" QUOTE(__LINE__) ")") + +}} // namespace proton::reactor + +#endif /*!PROTON_MSG_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/PrivateImplRef.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/PrivateImplRef.h b/proton-c/bindings/cpp/src/PrivateImplRef.h new file mode 100644 index 0000000..a2dccc2 --- /dev/null +++ b/proton-c/bindings/cpp/src/PrivateImplRef.h @@ -0,0 +1,97 @@ +#ifndef PROTON_CPP_PRIVATEIMPL_H +#define PROTON_CPP_PRIVATEIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/ImportExport.h" + +namespace proton { +namespace reactor { + +// Modified from qpid::messaging version to work without +// boost::intrusive_ptr but integrate with Proton's pn_class_t +// reference counting. Thread safety currently absent but fluid in +// intention... + + +/** + * Helper class to implement a class with a private, reference counted + * implementation and reference semantics. + * + * Such classes are used in the public API to hide implementation, they + * should. Example of use: + * + * === Foo.h + * + * template <class T> PrivateImplRef; + * class FooImpl; + * + * Foo : public Handle<FooImpl> { + * public: + * Foo(FooImpl* = 0); + * Foo(const Foo&); + * ~Foo(); + * Foo& operator=(const Foo&); + * + * int fooDo(); // and other Foo functions... + * + * private: + * typedef FooImpl Impl; + * Impl* impl; + * friend class PrivateImplRef<Foo>; + * + * === Foo.cpp + * + * typedef PrivateImplRef<Foo> PI; + * Foo::Foo(FooImpl* p) { PI::ctor(*this, p); } + * Foo::Foo(const Foo& c) : Handle<FooImpl>() { PI::copy(*this, c); } + * Foo::~Foo() { PI::dtor(*this); } + * Foo& Foo::operator=(const Foo& c) { return PI::assign(*this, c); } + * + * int foo::fooDo() { return impl->fooDo(); } + * + */ +template <class T> class PrivateImplRef { + public: + typedef typename T::Impl Impl; + + /** Get the implementation pointer from a handle */ + static Impl* get(const T& t) { return t.impl; } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const Impl* p) { + if (t.impl == p) return; + if (t.impl) Impl::decref(t.impl); + t.impl = const_cast<Impl *>(p); + if (t.impl) Impl::incref(t.impl); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl = p; if (p) Impl::incref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl) Impl::decref(t.impl); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_PRIVATEIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/ProtonEvent.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ProtonEvent.cpp b/proton-c/bindings/cpp/src/ProtonEvent.cpp new file mode 100644 index 0000000..16b92b6 --- /dev/null +++ b/proton-c/bindings/cpp/src/ProtonEvent.cpp @@ -0,0 +1,152 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/reactor.h" +#include "proton/event.h" +#include "proton/link.h" + +#include "proton/cpp/ProtonEvent.h" +#include "proton/cpp/ProtonHandler.h" +#include "proton/cpp/exceptions.h" +#include "proton/cpp/Container.h" + +#include "ConnectionImpl.h" +#include "Msg.h" +#include "contexts.h" + +namespace proton { +namespace reactor { + +ProtonEvent::ProtonEvent(pn_event_t *ce, pn_event_type_t t, Container &c) : + pnEvent(ce), + type((int) t), + container(c) +{} + +int ProtonEvent::getType() { return type; } + +pn_event_t *ProtonEvent::getPnEvent() { return pnEvent; } + +Container &ProtonEvent::getContainer() { return container; } + +Connection &ProtonEvent::getConnection() { + pn_connection_t *conn = pn_event_connection(getPnEvent()); + if (!conn) + throw ProtonException(MSG("No connection context for this event")); + return ConnectionImpl::getReactorReference(conn); +} + +Sender ProtonEvent::getSender() { + pn_link_t *lnk = pn_event_link(getPnEvent()); + if (lnk && pn_link_is_sender(lnk)) + return Sender(lnk); + throw ProtonException(MSG("No sender context for this event")); +} + +Receiver ProtonEvent::getReceiver() { + pn_link_t *lnk = pn_event_link(getPnEvent()); + if (lnk && pn_link_is_receiver(lnk)) + return Receiver(lnk); + throw ProtonException(MSG("No receiver context for this event")); +} + +Link ProtonEvent::getLink() { + pn_link_t *lnk = pn_event_link(getPnEvent()); + if (lnk) + if (pn_link_is_sender(lnk)) + return Sender(lnk); + else + return Receiver(lnk); + throw ProtonException(MSG("No link context for this event")); +} + + + + +void ProtonEvent::dispatch(Handler &h) { + ProtonHandler *handler = dynamic_cast<ProtonHandler*>(&h); + + if (handler) { + switch(type) { + + case PN_REACTOR_INIT: handler->onReactorInit(*this); break; + case PN_REACTOR_QUIESCED: handler->onReactorQuiesced(*this); break; + case PN_REACTOR_FINAL: handler->onReactorFinal(*this); break; + + case PN_TIMER_TASK: handler->onTimerTask(*this); break; + + case PN_CONNECTION_INIT: handler->onConnectionInit(*this); break; + case PN_CONNECTION_BOUND: handler->onConnectionBound(*this); break; + case PN_CONNECTION_UNBOUND: handler->onConnectionUnbound(*this); break; + case PN_CONNECTION_LOCAL_OPEN: handler->onConnectionLocalOpen(*this); break; + case PN_CONNECTION_LOCAL_CLOSE: handler->onConnectionLocalClose(*this); break; + case PN_CONNECTION_REMOTE_OPEN: handler->onConnectionRemoteOpen(*this); break; + case PN_CONNECTION_REMOTE_CLOSE: handler->onConnectionRemoteClose(*this); break; + case PN_CONNECTION_FINAL: handler->onConnectionFinal(*this); break; + + case PN_SESSION_INIT: handler->onSessionInit(*this); break; + case PN_SESSION_LOCAL_OPEN: handler->onSessionLocalOpen(*this); break; + case PN_SESSION_LOCAL_CLOSE: handler->onSessionLocalClose(*this); break; + case PN_SESSION_REMOTE_OPEN: handler->onSessionRemoteOpen(*this); break; + case PN_SESSION_REMOTE_CLOSE: handler->onSessionRemoteClose(*this); break; + case PN_SESSION_FINAL: handler->onSessionFinal(*this); break; + + case PN_LINK_INIT: handler->onLinkInit(*this); break; + case PN_LINK_LOCAL_OPEN: handler->onLinkLocalOpen(*this); break; + case PN_LINK_LOCAL_CLOSE: handler->onLinkLocalClose(*this); break; + case PN_LINK_LOCAL_DETACH: handler->onLinkLocalDetach(*this); break; + case PN_LINK_REMOTE_OPEN: handler->onLinkRemoteOpen(*this); break; + case PN_LINK_REMOTE_CLOSE: handler->onLinkRemoteClose(*this); break; + case PN_LINK_REMOTE_DETACH: handler->onLinkRemoteDetach(*this); break; + case PN_LINK_FLOW: handler->onLinkFlow(*this); break; + case PN_LINK_FINAL: handler->onLinkFinal(*this); break; + + case PN_DELIVERY: handler->onDelivery(*this); break; + + case PN_TRANSPORT: handler->onTransport(*this); break; + case PN_TRANSPORT_ERROR: handler->onTransportError(*this); break; + case PN_TRANSPORT_HEAD_CLOSED: handler->onTransportHeadClosed(*this); break; + case PN_TRANSPORT_TAIL_CLOSED: handler->onTransportTailClosed(*this); break; + case PN_TRANSPORT_CLOSED: handler->onTransportClosed(*this); break; + + case PN_SELECTABLE_INIT: handler->onSelectableInit(*this); break; + case PN_SELECTABLE_UPDATED: handler->onSelectableUpdated(*this); break; + case PN_SELECTABLE_READABLE: handler->onSelectableReadable(*this); break; + case PN_SELECTABLE_WRITABLE: handler->onSelectableWritable(*this); break; + case PN_SELECTABLE_EXPIRED: handler->onSelectableExpired(*this); break; + case PN_SELECTABLE_ERROR: handler->onSelectableError(*this); break; + case PN_SELECTABLE_FINAL: handler->onSelectableFinal(*this); break; + default: + throw ProtonException(MSG("Invalid Proton event type " << type)); + break; + } + } else { + h.onUnhandled(*this); + } + + // recurse through children + for (std::vector<Handler *>::iterator child = h.childHandlersBegin(); + child != h.childHandlersEnd(); ++child) { + dispatch(**child); + } +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/ProtonHandler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ProtonHandler.cpp b/proton-c/bindings/cpp/src/ProtonHandler.cpp new file mode 100644 index 0000000..83d9087 --- /dev/null +++ b/proton-c/bindings/cpp/src/ProtonHandler.cpp @@ -0,0 +1,74 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ProtonHandler.h" +#include "proton/cpp/ProtonEvent.h" + +namespace proton { +namespace reactor { + +ProtonHandler::ProtonHandler(){}; + +// Everything goes to onUnhandled() unless overriden by subclass + +void ProtonHandler::onReactorInit(Event &e) { onUnhandled(e); } +void ProtonHandler::onReactorQuiesced(Event &e) { onUnhandled(e); } +void ProtonHandler::onReactorFinal(Event &e) { onUnhandled(e); } +void ProtonHandler::onTimerTask(Event &e) { onUnhandled(e); } +void ProtonHandler::onConnectionInit(Event &e) { onUnhandled(e); } +void ProtonHandler::onConnectionBound(Event &e) { onUnhandled(e); } +void ProtonHandler::onConnectionUnbound(Event &e) { onUnhandled(e); } +void ProtonHandler::onConnectionLocalOpen(Event &e) { onUnhandled(e); } +void ProtonHandler::onConnectionLocalClose(Event &e) { onUnhandled(e); } +void ProtonHandler::onConnectionRemoteOpen(Event &e) { onUnhandled(e); } +void ProtonHandler::onConnectionRemoteClose(Event &e) { onUnhandled(e); } +void ProtonHandler::onConnectionFinal(Event &e) { onUnhandled(e); } +void ProtonHandler::onSessionInit(Event &e) { onUnhandled(e); } +void ProtonHandler::onSessionLocalOpen(Event &e) { onUnhandled(e); } +void ProtonHandler::onSessionLocalClose(Event &e) { onUnhandled(e); } +void ProtonHandler::onSessionRemoteOpen(Event &e) { onUnhandled(e); } +void ProtonHandler::onSessionRemoteClose(Event &e) { onUnhandled(e); } +void ProtonHandler::onSessionFinal(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkInit(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkLocalOpen(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkLocalClose(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkLocalDetach(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkRemoteOpen(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkRemoteClose(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkRemoteDetach(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkFlow(Event &e) { onUnhandled(e); } +void ProtonHandler::onLinkFinal(Event &e) { onUnhandled(e); } +void ProtonHandler::onDelivery(Event &e) { onUnhandled(e); } +void ProtonHandler::onTransport(Event &e) { onUnhandled(e); } +void ProtonHandler::onTransportError(Event &e) { onUnhandled(e); } +void ProtonHandler::onTransportHeadClosed(Event &e) { onUnhandled(e); } +void ProtonHandler::onTransportTailClosed(Event &e) { onUnhandled(e); } +void ProtonHandler::onTransportClosed(Event &e) { onUnhandled(e); } +void ProtonHandler::onSelectableInit(Event &e) { onUnhandled(e); } +void ProtonHandler::onSelectableUpdated(Event &e) { onUnhandled(e); } +void ProtonHandler::onSelectableReadable(Event &e) { onUnhandled(e); } +void ProtonHandler::onSelectableWritable(Event &e) { onUnhandled(e); } +void ProtonHandler::onSelectableExpired(Event &e) { onUnhandled(e); } +void ProtonHandler::onSelectableError(Event &e) { onUnhandled(e); } +void ProtonHandler::onSelectableFinal(Event &e) { onUnhandled(e); } + +void ProtonHandler::onUnhandled(Event &e) {} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/ProtonImplRef.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/ProtonImplRef.h b/proton-c/bindings/cpp/src/ProtonImplRef.h new file mode 100644 index 0000000..8f9f360 --- /dev/null +++ b/proton-c/bindings/cpp/src/ProtonImplRef.h @@ -0,0 +1,66 @@ +#ifndef PROTON_CPP_PROTONIMPL_H +#define PROTON_CPP_PROTONIMPL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/ImportExport.h" +#include "proton/object.h" + +namespace proton { +namespace reactor { + +// Modified from qpid::messaging version to work without +// boost::intrusive_ptr but integrate with Proton's pn_class_t +// reference counting. Thread safety currently absent but fluid in +// intention... + + +/** + * See PrivateImplRef.h This is for lightly wrapped Proton pn_object_t targets. + * class Foo : ProtonHandle<pn_foo_t> {...} + */ + +template <class T> class ProtonImplRef { + public: + typedef typename T::Impl Impl; + + /** Get the implementation pointer from a handle */ + static Impl* get(const T& t) { return t.impl; } + + /** Set the implementation pointer in a handle */ + static void set(T& t, const Impl* p) { + if (t.impl == p) return; + if (t.impl) pn_decref(t.impl); + t.impl = const_cast<Impl *>(p); + if (t.impl) pn_incref(t.impl); + } + + // Helper functions to implement the ctor, dtor, copy, assign + static void ctor(T& t, Impl* p) { t.impl = p; if (p) pn_incref(p); } + static void copy(T& t, const T& x) { if (&t == &x) return; t.impl = 0; assign(t, x); } + static void dtor(T& t) { if(t.impl) pn_decref(t.impl); } + static T& assign(T& t, const T& x) { set(t, get(x)); return t;} +}; + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_PROTONIMPL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Receiver.cpp b/proton-c/bindings/cpp/src/Receiver.cpp new file mode 100644 index 0000000..557e736 --- /dev/null +++ b/proton-c/bindings/cpp/src/Receiver.cpp @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Link.h" +#include "proton/cpp/Receiver.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/link.h" + +namespace proton { +namespace reactor { + + +Receiver::Receiver(pn_link_t *lnk) : Link(lnk) {} +Receiver::Receiver() : Link(0) {} + +void Receiver::verifyType(pn_link_t *lnk) { + if (lnk && pn_link_is_sender(lnk)) + throw ProtonException(MSG("Creating receiver with sender context")); +} + + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Sender.cpp b/proton-c/bindings/cpp/src/Sender.cpp new file mode 100644 index 0000000..74d4b0f --- /dev/null +++ b/proton-c/bindings/cpp/src/Sender.cpp @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Link.h" +#include "proton/cpp/Sender.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" +#include "contexts.h" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/link.h" +#include "proton/types.h" +#include "proton/codec.h" +#include "proton/message.h" +#include "proton/delivery.h" +#include <stdlib.h> +#include <string.h> + +namespace proton { +namespace reactor { + + +Sender::Sender(pn_link_t *lnk = 0) : Link(lnk) {} +Sender::Sender() : Link(0) {} + +void Sender::verifyType(pn_link_t *lnk) { + if (lnk && pn_link_is_receiver(lnk)) + throw ProtonException(MSG("Creating sender with receiver context")); +} + +namespace{ +// revisit if thread safety required +uint64_t tagCounter = 0; +} + +void Sender::send(Message &message) { + char tag[8]; + void *ptr = &tag; + uint64_t id = ++tagCounter; + *((uint64_t *) ptr) = id; + pn_delivery_t *dlv = pn_delivery(getPnLink(), pn_dtag(tag, 8)); + std::string buf; + message.encode(buf); + pn_link_t *link = getPnLink(); + pn_link_send(link, buf.data(), buf.size()); + pn_link_advance(link); + if (pn_link_snd_settle_mode(link) == PN_SND_SETTLED) + pn_delivery_settle(dlv); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Session.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Session.cpp b/proton-c/bindings/cpp/src/Session.cpp new file mode 100644 index 0000000..d2b01dd --- /dev/null +++ b/proton-c/bindings/cpp/src/Session.cpp @@ -0,0 +1,64 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Session.h" +#include "contexts.h" + +#include "proton/connection.h" +#include "proton/session.h" +#include "proton/cpp/Session.h" +#include "proton/cpp/Connection.h" +#include "ConnectionImpl.h" + +namespace proton { +namespace reactor { + + +Session::Session(pn_session_t *s) : pnSession(s) +{ + pn_incref(pnSession); +} + +Session::~Session() { + pn_decref(pnSession); +} + +pn_session_t *Session::getPnSession() { return pnSession; } + +void Session::open() { + pn_session_open(pnSession); +} + +Connection &Session::getConnection() { + pn_connection_t *c = pn_session_connection(pnSession); + return ConnectionImpl::getReactorReference(c); +} + +Receiver Session::createReceiver(std::string name) { + pn_link_t *link = pn_receiver(pnSession, name.c_str()); + return Receiver(link); +} + +Sender Session::createSender(std::string name) { + pn_link_t *link = pn_sender(pnSession, name.c_str()); + return Sender(link); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Transport.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Transport.cpp b/proton-c/bindings/cpp/src/Transport.cpp new file mode 100644 index 0000000..7f22b84 --- /dev/null +++ b/proton-c/bindings/cpp/src/Transport.cpp @@ -0,0 +1,39 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/Transport.h" +#include "proton/cpp/Connection.h" + +#include "proton/transport.h" + +namespace proton { +namespace reactor { + + +Transport::Transport() : connection(0), pnTransport(pn_transport()) {} ; + +Transport::~Transport() { pn_decref(pnTransport); } + +void Transport::bind(Connection &c) { + connection = &c; + pn_transport_bind(pnTransport, c.getPnConnection()); +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Url.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Url.cpp b/proton-c/bindings/cpp/src/Url.cpp new file mode 100644 index 0000000..058bd2f --- /dev/null +++ b/proton-c/bindings/cpp/src/Url.cpp @@ -0,0 +1,77 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/exceptions.h" +#include "Url.h" +#include "ProtonImplRef.h" +#include "Msg.h" + +namespace proton { +namespace reactor { + +template class ProtonHandle<pn_url_t>; +typedef ProtonImplRef<Url> PI; + + +Url::Url(const std::string &url) { + pn_url_t *up = pn_url_parse(url.c_str()); + // refcount is 1, no need to incref + if (!up) + throw ProtonException(MSG("invalid URL: " << url)); + impl = up; +} + +Url::~Url() { PI::dtor(*this); } + +Url::Url(const Url& c) : ProtonHandle<pn_url_t>() { + PI::copy(*this, c); +} + +Url& Url::operator=(const Url& c) { + return PI::assign(*this, c); +} + +std::string Url::getPort() { + const char *p = pn_url_get_port(impl); + if (!p) + return std::string("5672"); + else + return std::string(p); +} + +std::string Url::getHost() { + const char *p = pn_url_get_host(impl); + if (!p) + return std::string("0.0.0.0"); + else + return std::string(p); +} + +std::string Url::getPath() { + const char *p = pn_url_get_path(impl); + if (!p) + return std::string(""); + else + return std::string(p); +} + + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/Url.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/Url.h b/proton-c/bindings/cpp/src/Url.h new file mode 100644 index 0000000..042ab45 --- /dev/null +++ b/proton-c/bindings/cpp/src/Url.h @@ -0,0 +1,49 @@ +#ifndef PROTON_CPP_URL_H +#define PROTON_CPP_URL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/ImportExport.h" +#include "proton/cpp/ProtonHandle.h" +#include "proton/url.h" +#include <string> + +namespace proton { +namespace reactor { + +class Url : public ProtonHandle<pn_url_t> +{ + public: + PROTON_CPP_EXTERN Url(const std::string &url); + PROTON_CPP_EXTERN ~Url(); + PROTON_CPP_EXTERN Url(const Url&); + PROTON_CPP_EXTERN Url& operator=(const Url&); + PROTON_CPP_EXTERN std::string getHost(); + PROTON_CPP_EXTERN std::string getPort(); + PROTON_CPP_EXTERN std::string getPath(); + private: + friend class ProtonImplRef<Url>; +}; + + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_URL_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/contexts.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.cpp b/proton-c/bindings/cpp/src/contexts.cpp new file mode 100644 index 0000000..b1dec49 --- /dev/null +++ b/proton-c/bindings/cpp/src/contexts.cpp @@ -0,0 +1,92 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "contexts.h" +#include "proton/cpp/exceptions.h" +#include "Msg.h" +#include "proton/object.h" +#include "proton/session.h" +#include "proton/link.h" + +PN_HANDLE(PNI_CPP_CONNECTION_CONTEXT) +PN_HANDLE(PNI_CPP_SESSION_CONTEXT) +PN_HANDLE(PNI_CPP_LINK_CONTEXT) +PN_HANDLE(PNI_CPP_CONTAINER_CONTEXT) + +namespace proton { +namespace reactor { + +void setConnectionContext(pn_connection_t *pnConnection, ConnectionImpl *connection) { + pn_record_t *record = pn_connection_attachments(pnConnection); + pn_record_def(record, PNI_CPP_CONNECTION_CONTEXT, PN_VOID); + pn_record_set(record, PNI_CPP_CONNECTION_CONTEXT, connection); +} + +ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection) { + if (!pnConnection) return NULL; + pn_record_t *record = pn_connection_attachments(pnConnection); + ConnectionImpl *p = (ConnectionImpl *) pn_record_get(record, PNI_CPP_CONNECTION_CONTEXT); + return p; +} + + +void setSessionContext(pn_session_t *pnSession, Session *session) { + pn_record_t *record = pn_session_attachments(pnSession); + pn_record_def(record, PNI_CPP_SESSION_CONTEXT, PN_VOID); + pn_record_set(record, PNI_CPP_SESSION_CONTEXT, session); +} + +Session *getSessionContext(pn_session_t *pnSession) { + if (!pnSession) return NULL; + pn_record_t *record = pn_session_attachments(pnSession); + Session *p = (Session *) pn_record_get(record, PNI_CPP_SESSION_CONTEXT); + return p; +} + + +void setLinkContext(pn_link_t *pnLink, Link *link) { + pn_record_t *record = pn_link_attachments(pnLink); + pn_record_def(record, PNI_CPP_LINK_CONTEXT, PN_VOID); + pn_record_set(record, PNI_CPP_LINK_CONTEXT, link); +} + +Link *getLinkContext(pn_link_t *pnLink) { + if (!pnLink) return NULL; + pn_record_t *record = pn_link_attachments(pnLink); + Link *p = (Link *) pn_record_get(record, PNI_CPP_LINK_CONTEXT); + return p; +} + + +void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container) { + pn_record_t *record = pn_reactor_attachments(pnReactor); + pn_record_def(record, PNI_CPP_CONTAINER_CONTEXT, PN_VOID); + pn_record_set(record, PNI_CPP_CONTAINER_CONTEXT, container); +} + +ContainerImpl *getContainerContext(pn_reactor_t *pnReactor) { + pn_record_t *record = pn_reactor_attachments(pnReactor); + ContainerImpl *p = (ContainerImpl *) pn_record_get(record, PNI_CPP_CONTAINER_CONTEXT); + if (!p) throw ProtonException(MSG("Reactor has no C++ container context")); + return p; +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/contexts.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.h b/proton-c/bindings/cpp/src/contexts.h new file mode 100644 index 0000000..c04a77a --- /dev/null +++ b/proton-c/bindings/cpp/src/contexts.h @@ -0,0 +1,48 @@ +#ifndef PROTON_CPP_CONTEXTS_H +#define PROTON_CPP_CONTEXTS_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/reactor.h" +#include "proton/connection.h" + +namespace proton { +namespace reactor { + +class ConnectionImpl; +void setConnectionContext(pn_connection_t *pnConnection, ConnectionImpl *connection); +ConnectionImpl *getConnectionContext(pn_connection_t *pnConnection); + +class Session; +void setSessionContext(pn_session_t *pnSession, Session *session); +Session *getSessionContext(pn_session_t *pnSession); + +class Link; +void setLinkContext(pn_link_t *pnLink, Link *link); +Link *getLinkContext(pn_link_t *pnLink); + +class ContainerImpl; +void setContainerContext(pn_reactor_t *pnReactor, ContainerImpl *container); +ContainerImpl *getContainerContext(pn_reactor_t *pnReactor); + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_CONTEXTS_H*/ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/exceptions.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/exceptions.cpp b/proton-c/bindings/cpp/src/exceptions.cpp new file mode 100644 index 0000000..c19e61d --- /dev/null +++ b/proton-c/bindings/cpp/src/exceptions.cpp @@ -0,0 +1,33 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "proton/cpp/exceptions.h" + +namespace proton { +namespace reactor { + +ProtonException::ProtonException(const std::string& msg) throw() : message(msg) {} +ProtonException::~ProtonException() throw() {} +const char* ProtonException::what() const throw() { return message.c_str(); } + +MessageReject::MessageReject(const std::string& msg) throw() : ProtonException(msg) {} +MessageRelease::MessageRelease(const std::string& msg) throw() : ProtonException(msg) {} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/platform.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/platform.cpp b/proton-c/bindings/cpp/src/platform.cpp new file mode 100644 index 0000000..4eac408 --- /dev/null +++ b/proton-c/bindings/cpp/src/platform.cpp @@ -0,0 +1,79 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "platform.h" +#include <string> + +// Copy neccesary platform neutral functionality from Proton-C +// TODO: make this sensibly maintainable (even though it is mostly static) + +#ifdef USE_UUID_GENERATE +#include <uuid/uuid.h> +#include <stdlib.h> +char* pn_i_genuuid(void) { + char *generated = (char *) malloc(37*sizeof(char)); + uuid_t uuid; + uuid_generate(uuid); + uuid_unparse(uuid, generated); + return generated; +} +#elif USE_UUID_CREATE +#include <uuid.h> +char* pn_i_genuuid(void) { + char *generated; + uuid_t uuid; + uint32_t rc; + uuid_create(&uuid, &rc); + // Under FreeBSD the returned string is newly allocated from the heap + uuid_to_string(&uuid, &generated, &rc); + return generated; +} +#elif USE_WIN_UUID +#include <rpc.h> +char* pn_i_genuuid(void) { + unsigned char *generated; + UUID uuid; + UuidCreate(&uuid); + UuidToString(&uuid, &generated); + char* r = pn_strdup((const char*)generated); + RpcStringFree(&generated); + return r; +} +#else +#error "Don't know how to generate uuid strings on this platform" +#endif + + + +namespace proton { +namespace reactor { + +// include Proton-c platform routines into a local namespace + + +std::string generateUuid() { + char *s = pn_i_genuuid(); + std::string url(s); + free(s); + return url; +} + +}} // namespace proton::reactor http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ad7c9778/proton-c/bindings/cpp/src/platform.h ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/platform.h b/proton-c/bindings/cpp/src/platform.h new file mode 100644 index 0000000..5e5b726 --- /dev/null +++ b/proton-c/bindings/cpp/src/platform.h @@ -0,0 +1,39 @@ +#ifndef PROTON_CPP_PLATFORM_H +#define PROTON_CPP_PLATFORM_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/cpp/ProtonHandler.h" +#include "proton/event.h" +#include "proton/reactor.h" +#include <string> + + +namespace proton { +namespace reactor { + +std::string generateUuid(); +// Todo: TimeNow(); + +}} // namespace proton::reactor + +#endif /*!PROTON_CPP_PLATFORM_H*/ --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
