PROTON-1036: c++: engine API for integration with external IO frameworks

proton::engine wraps the pn transport, connection and collector objects and
provides a simple bytes-in/bytes-out interface that can be used to integrate
proton with any IO framework.

- added proton::engine
- added proton::event_loop base class for proton::engine and proton::container.
- extracted broker.hpp: common code for example brokers (queue and handler)
- added select_broker.cpp example
- added url::port_int() to get the integer value of a URL.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/193a7dd5
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/193a7dd5
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/193a7dd5

Branch: refs/heads/go1
Commit: 193a7dd5d896ce62143b4a52179de067ae491abb
Parents: 4f5e18a
Author: Alan Conway <[email protected]>
Authored: Mon Nov 2 17:56:03 2015 -0500
Committer: Alan Conway <[email protected]>
Committed: Mon Nov 2 18:14:08 2015 -0500

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |   6 +
 examples/cpp/broker.cpp                         | 171 ++-------------
 examples/cpp/broker.hpp                         | 213 +++++++++++++++++++
 examples/cpp/example_test.py                    |   3 +-
 examples/cpp/select_broker.cpp                  | 186 ++++++++++++++++
 proton-c/bindings/cpp/CMakeLists.txt            |   1 +
 proton-c/bindings/cpp/README.md                 |   2 -
 proton-c/bindings/cpp/docs/mainpage.md          |   6 +
 .../bindings/cpp/include/proton/connection.hpp  |  21 +-
 .../bindings/cpp/include/proton/container.hpp   |   4 +-
 proton-c/bindings/cpp/include/proton/engine.hpp | 155 ++++++++++++++
 proton-c/bindings/cpp/include/proton/event.hpp  |   9 +-
 .../bindings/cpp/include/proton/event_loop.hpp  |  41 ++++
 proton-c/bindings/cpp/include/proton/types.hpp  |   3 -
 proton-c/bindings/cpp/include/proton/url.hpp    |   4 +
 proton-c/bindings/cpp/src/connection.cpp        |   9 +-
 proton-c/bindings/cpp/src/connector.cpp         |   2 +-
 proton-c/bindings/cpp/src/container_impl.cpp    |   2 +-
 proton-c/bindings/cpp/src/engine.cpp            | 132 ++++++++++++
 proton-c/bindings/cpp/src/event.cpp             |   9 +
 proton-c/bindings/cpp/src/messaging_event.cpp   |   6 +-
 proton-c/bindings/cpp/src/messaging_event.hpp   |   2 +-
 proton-c/bindings/cpp/src/proton_event.cpp      |  25 ++-
 proton-c/bindings/cpp/src/proton_event.hpp      | 109 +++++-----
 proton-c/bindings/cpp/src/url.cpp               |  15 +-
 25 files changed, 901 insertions(+), 235 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 34edb83..d5925b8 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -32,6 +32,7 @@ foreach(example
     direct_send
     sync_client
     client
+    select_broker
     server
     server_direct
     recurring_timer
@@ -54,3 +55,8 @@ endif(WIN32)
 
 add_test(NAME cpp_example_test
   COMMAND ${PYTHON_EXECUTABLE} ${env_py} -- "PATH=${test_path}" 
"PYTHONPATH=${CMAKE_CURRENT_SOURCE_DIR}" ${VALGRIND_ENV} ${PYTHON_EXECUTABLE} 
-m unittest -v example_test)
+
+set(broker_tests example_test.ExampleTest.test_request_response 
example_test.ExampleTest.test_simple_send_recv)
+
+add_test(NAME cpp_example_select_test
+  COMMAND ${PYTHON_EXECUTABLE} ${env_py} -- "PATH=${test_path}" 
"PYTHONPATH=${CMAKE_CURRENT_SOURCE_DIR}" "TEST_BROKER=select_broker" 
${VALGRIND_ENV} ${PYTHON_EXECUTABLE} -m unittest -v ${broker_tests})

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.cpp b/examples/cpp/broker.cpp
index 080bc9c..a98d43d 100644
--- a/examples/cpp/broker.cpp
+++ b/examples/cpp/broker.cpp
@@ -20,177 +20,42 @@
  */
 
 #include "options.hpp"
+#include "broker.hpp"
 
 #include "proton/acceptor.hpp"
 #include "proton/container.hpp"
-#include "proton/messaging_handler.hpp"
-#include "proton/url.hpp"
 #include "proton/value.hpp"
 
 #include <iostream>
-#include <sstream>
 #include <deque>
 #include <map>
 #include <list>
 #include <string>
 
-class queue {
+class broker {
   public:
-    bool dynamic;
-    typedef std::deque<proton::message> message_queue;
-    typedef std::list<proton::counted_ptr<proton::sender> > sender_list;
-    message_queue messages;
-    sender_list consumers;
+    broker(const proton::url& url) : handler_(url, queues_) {}
 
-    queue(bool dyn = false) : dynamic(dyn) {}
+    proton::messaging_handler& handler() { return handler_; }
 
-    void subscribe(proton::sender &c) {
-        consumers.push_back(c.ptr());
-    }
-
-    bool unsubscribe(proton::sender &c) {
-        consumers.remove(c.ptr());
-        return (consumers.size() == 0 && (dynamic || messages.size() == 0));
-    }
-
-    void publish(const proton::message &m) {
-        messages.push_back(m);
-        dispatch(0);
-    }
-
-    void dispatch(proton::sender *s) {
-        while (deliver_to(s)) {}
-    }
-
-    bool deliver_to(proton::sender *consumer) {
-        // deliver to single consumer if supplied, else all consumers
-        int count = consumer ? 1 : consumers.size();
-        if (!count) return false;
-        bool result = false;
-        sender_list::iterator it = consumers.begin();
-        if (!consumer && count)
-            consumer = it->get();
-
-        while (messages.size()) {
-            if (consumer->credit()) {
-                consumer->send(messages.front());
-                messages.pop_front();
-                result = true;
-            }
-            if (--count)
-                it++;
-            else
-                return result;
-        }
-        return false;
-    }
-};
-
-class broker : public proton::messaging_handler {
   private:
-    typedef std::map<std::string, queue *> queue_map;
-    proton::url url;
-    queue_map queues;
-    uint64_t queue_count;       // Use to generate unique queue IDs.
-
-  public:
-
-    broker(const proton::url &u) : url(u), queue_count(0) {}
 
-    void on_start(proton::event &e) {
-        e.container().listen(url);
-        std::cout << "broker listening on " << url << std::endl;
-    }
+    class my_handler : public broker_handler {
+      public:
+        my_handler(const proton::url& u, queues& qs) : broker_handler(qs), 
url_(u) {}
 
-    class queue &get_queue(std::string &address) {
-        queue_map::iterator it = queues.find(address);
-        if (it == queues.end()) {
-            queues[address] = new queue();
-            return *queues[address];
-        }
-        else {
-            return *it->second;
+        void on_start(proton::event &e) {
+            e.container().listen(url_);
+            std::cout << "broker listening on " << url_ << std::endl;
         }
-    }
 
-    std::string queue_name() {
-        std::ostringstream os;
-        os << "q" << queue_count++;
-        return os.str();
-    }
+      private:
+        const proton::url& url_;
+    };
 
-    void on_link_opening(proton::event &e) {
-        proton::link& lnk = e.link();
-        if (lnk.sender()) {
-            proton::terminus &remote_source(lnk.remote_source());
-            if (remote_source.dynamic()) {
-                std::string address = queue_name();
-                lnk.source().address(address);
-                queue *q = new queue(true);
-                queues[address] = q;
-                q->subscribe(*lnk.sender());
-                std::cout << "broker dynamic outgoing link from " << address 
<< std::endl;
-            }
-            else {
-                std::string address = remote_source.address();
-                if (!address.empty()) {
-                    lnk.source().address(address);
-                    get_queue(address).subscribe(*lnk.sender());
-                    std::cout << "broker outgoing link from " << address << 
std::endl;
-                }
-            }
-        }
-        else {
-            std::string address = lnk.remote_target().address();
-            if (!address.empty())
-                lnk.target().address(address);
-            std::cout << "broker incoming link to " << address << std::endl;
-        }
-    }
-
-    void unsubscribe (proton::sender &lnk) {
-        std::string address = lnk.source().address();
-        queue_map::iterator it = queues.find(address);
-        if (it != queues.end() && it->second->unsubscribe(lnk)) {
-            delete it->second;
-            queues.erase(it);
-        }
-    }
-
-    void on_link_closing(proton::event &e) {
-        proton::link &lnk = e.link();
-        if (lnk.sender()) {
-            unsubscribe(*lnk.sender());
-        }
-    }
-
-    void on_connection_closing(proton::event &e) {
-        remove_stale_consumers(e.connection());
-    }
-
-    void on_disconnected(proton::event &e) {
-        remove_stale_consumers(e.connection());
-    }
-
-    void remove_stale_consumers(proton::connection &connection) {
-        proton::link_range r = 
connection.find_links(proton::endpoint::REMOTE_ACTIVE);
-        for (proton::link_iterator l = r.begin(); l != r.end(); ++l) {
-            if (l->sender()) {
-                unsubscribe(*l->sender());
-            }
-        }
-    }
-
-    void on_sendable(proton::event &e) {
-        proton::link& lnk = e.link();
-        std::string addr = lnk.source().address();
-        get_queue(addr).dispatch(lnk.sender());
-    }
-
-    void on_message(proton::event &e) {
-        std::string addr = e.link().target().address();
-        get_queue(addr).publish(e.message());
-    }
+  private:
+    queues queues_;
+    my_handler handler_;
 };
 
 int main(int argc, char **argv) {
@@ -200,8 +65,8 @@ int main(int argc, char **argv) {
     opts.add_value(url, 'a', "address", "listen on URL", "URL");
     try {
         opts.parse();
-        broker broker(url);
-        proton::container(broker).run();
+        broker b(url);
+        proton::container(b.handler()).run();
         return 0;
     } catch (const bad_option& e) {
         std::cout << opts << std::endl << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/broker.hpp
----------------------------------------------------------------------
diff --git a/examples/cpp/broker.hpp b/examples/cpp/broker.hpp
new file mode 100644
index 0000000..88d2e33
--- /dev/null
+++ b/examples/cpp/broker.hpp
@@ -0,0 +1,213 @@
+#ifndef BROKER_HPP
+#define BROKER_HPP
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+/**@file
+ *
+ * Common code used by different broker examples.
+ *
+ * The examples add functionality as needed, this helps to make it easier to 
see
+ * the important differences between the examples.
+ */
+
+#include "proton/messaging_handler.hpp"
+#include "proton/url.hpp"
+
+#include <iostream>
+#include <deque>
+#include <map>
+#include <list>
+#include <sstream>
+
+bool debug_enabled() {
+    const char *s = ::getenv("BROKER_DEBUG");
+    return s && *s;             // Set to non-empty string
+}
+
+/// Debug log messages to stderr.
+#define LOG_DEBUG(MSG) do { if (debug_enabled()) { std::cerr << MSG << 
std::endl; } } while(0)
+
+/** A simple implementation of a queue. */
+class queue {
+  public:
+    queue(const std::string &name, bool dynamic = false) : name_(name), 
dynamic_(dynamic) {}
+
+    std::string name() const { return name_; }
+
+    void subscribe(proton::sender &s) {
+        LOG_DEBUG("queue " << name_ << ": subscribe " << s.name());
+        consumers_.push_back(s.ptr());
+    }
+
+    // Return true if queue can be deleted.
+    bool unsubscribe(proton::sender &s) {
+        LOG_DEBUG("queue " << name_ << ": unsubscribe " << s.name());
+        consumers_.remove(s.ptr());
+        return (consumers_.size() == 0 && (dynamic_ || messages_.size() == 0));
+    }
+
+    void publish(const proton::message &m, proton::receiver *r) {
+        LOG_DEBUG("queue " << name_ << ": receive from " << r->name() << " : " 
<< m.body());
+        messages_.push_back(m);
+        dispatch(0);
+    }
+
+    void dispatch(proton::sender *s) {
+        while (deliver_to(s)) {}
+    }
+
+    bool deliver_to(proton::sender *s) {
+        // deliver to single sender if supplied, else all consumers
+        int count = s ? 1 : consumers_.size();
+        if (!count) return false;
+        bool result = false;
+        sender_list::iterator it = consumers_.begin();
+        if (!s && count)
+            s = it->get();
+
+        while (messages_.size()) {
+            if (s->credit()) {
+                const proton::message& m = messages_.front();
+                LOG_DEBUG("queue " << name_ << ": send to " << s->name() << " 
: " << m.body());
+                s->send(m);
+                messages_.pop_front();
+                result = true;
+            }
+            if (--count)
+                it++;
+            else
+                return result;
+        }
+        return false;
+    }
+
+  private:
+    typedef std::deque<proton::message> message_queue;
+    typedef std::list<proton::counted_ptr<proton::sender> > sender_list;
+
+    std::string name_;
+    bool dynamic_;
+    message_queue messages_;
+    sender_list consumers_;
+};
+
+/** A collection of queues and queue factory, used by a broker */
+class queues {
+  public:
+    queues() : next_id_(0) {}
+
+    // Get or create a queue.
+    virtual queue &get(const std::string &address = std::string()) {
+        if (address.empty()) throw std::runtime_error("empty queue name");
+        queue*& q = queues_[address];
+        if (!q) q = new queue(address);
+        return *q;
+    }
+
+    // Create a dynamic queue with a unique name.
+    virtual queue &dynamic() {
+        std::ostringstream os;
+        os << "q" << next_id_++;
+        queue *q = queues_[os.str()] = new queue(os.str(), true);
+        return *q;
+    }
+
+    // Delete the named queue
+    virtual void erase(std::string &name) {
+        delete queues_[name];
+        queues_.erase(name);
+    }
+
+  protected:
+    typedef std::map<std::string, queue *> queue_map;
+    queue_map queues_;
+    uint64_t next_id_;       // Use to generate unique queue IDs.
+};
+
+
+/** Common handler logic for brokers. */
+class broker_handler : public proton::messaging_handler {
+  public:
+    broker_handler(queues& qs) : queues_(qs) {}
+
+    void on_link_opening(proton::event &e) {
+        proton::link& lnk = e.link();
+        if (lnk.sender()) {
+            proton::terminus &remote_source(lnk.remote_source());
+            queue &q = remote_source.dynamic() ?
+                queues_.dynamic() : queues_.get(remote_source.address());
+            lnk.source().address(q.name());
+            q.subscribe(*lnk.sender());
+            std::cout << "broker outgoing link from " << q.name() << std::endl;
+        }
+        else {                  // Receiver
+            std::string address = lnk.remote_target().address();
+            if (!address.empty()) {
+                lnk.target().address(address);
+                std::cout << "broker incoming link to " << address << 
std::endl;
+            }
+        }
+    }
+
+    void unsubscribe (proton::sender &lnk) {
+        std::string address = lnk.source().address();
+        if (queues_.get(address).unsubscribe(lnk))
+            queues_.erase(address);
+    }
+
+    void on_link_closing(proton::event &e) {
+        proton::link &lnk = e.link();
+        if (lnk.sender()) 
+            unsubscribe(*lnk.sender());
+    }
+
+    void on_connection_closing(proton::event &e) {
+        remove_stale_consumers(e.connection());
+    }
+
+    void on_disconnected(proton::event &e) {
+        remove_stale_consumers(e.connection());
+    }
+
+    void remove_stale_consumers(proton::connection &connection) {
+        proton::link_range r = 
connection.find_links(proton::endpoint::REMOTE_ACTIVE);
+        for (proton::link_iterator l = r.begin(); l != r.end(); ++l) {
+            if (l->sender())
+                unsubscribe(*l->sender());
+        }
+    }
+
+    void on_sendable(proton::event &e) {
+        proton::link& lnk = e.link();
+        std::string address = lnk.source().address();
+        queues_.get(address).dispatch(lnk.sender());
+    }
+
+    void on_message(proton::event &e) {
+        std::string address = e.link().target().address();
+        queues_.get(address).publish(e.message(), e.link().receiver());
+    }
+
+  protected:
+    queues& queues_;
+};
+
+
+#endif // BROKER_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index d2b3169..3ed8b9c 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -97,8 +97,9 @@ class Broker(object):
             cls._broker = None
 
     def __init__(self):
+        broker_exe = os.environ.get("TEST_BROKER") or "broker"
         self.addr = pick_addr()
-        cmd = cmdline("broker", "-a", self.addr)
+        cmd = cmdline(broker_exe, "-a", self.addr)
         try:
             self.process = Popen(cmd, stdout=NULL, stderr=sys.stderr)
             wait_addr(self.addr)

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/examples/cpp/select_broker.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/select_broker.cpp b/examples/cpp/select_broker.cpp
new file mode 100644
index 0000000..49cc2dc
--- /dev/null
+++ b/examples/cpp/select_broker.cpp
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "options.hpp"
+#include "broker.hpp"
+
+#include "proton/engine.hpp"
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+#include <sys/types.h>
+#include <sys/socket.h>
+#include <netinet/in.h>
+#include <arpa/inet.h>
+#include <netdb.h>
+
+#include <sys/select.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <err.h>
+#include <errno.h>
+
+template <class T> T check(T result, const std::string& msg=std::string()) {
+     // Note strerror is thread unsafe, this example is single-threaded.
+    if (result < 0) throw std::runtime_error(msg + ": " + strerror(errno));
+    return result;
+}
+
+static void fd_set_if(bool on, int fd, fd_set *fds) {
+    if (on)
+        FD_SET(fd, fds);
+    else
+        FD_CLR(fd, fds);
+}
+
+class broker {
+
+    typedef std::map<int, proton::engine*> engine_map;
+
+    queues queues_;
+    broker_handler handler_;
+    engine_map engines_;
+    fd_set reading_, writing_;
+    int listen_;
+
+  public:
+    broker() : handler_(queues_) {
+        FD_ZERO(&reading_);
+        FD_ZERO(&writing_);
+    }
+
+    ~broker() {
+        for (engine_map::iterator i = engines_.begin(); i != engines_.end(); 
++i)
+            delete i->second;
+    }
+
+    void run(uint16_t port) {
+
+        listen(port);
+
+        while(true) {
+            fd_set readable_set = reading_;
+            fd_set writable_set = writing_;
+
+            check(::select(FD_SETSIZE, &readable_set, &writable_set, NULL, 
NULL), "select");
+            for (int fd = 0; fd < FD_SETSIZE; ++fd) {
+                if (fd == listen_ && FD_ISSET(fd, &readable_set)) 
+                    accept();
+
+                if (engines_.find(fd) != engines_.end()) {
+                    proton::engine& eng = *engines_[fd];
+                    try {
+                        if (FD_ISSET(fd, &readable_set))
+                            readable(fd, eng);
+
+                        if (FD_ISSET(fd, &writable_set))
+                            writable(fd, eng);
+                    } catch (const std::exception& e) {
+                        std::cout << e.what() << " fd=" << fd << std::endl;
+                        eng.close_input();
+                        eng.close_output();
+                    }
+                    // Set reading/writing bits for next time around
+                    fd_set_if(eng.input().size(), fd, &reading_);
+                    fd_set_if(eng.output().size(), fd, &writing_);
+
+                    if (eng.closed()) {
+                        ::close(fd);
+                        delete engines_[fd];
+                        engines_.erase(fd);
+                    }
+                }
+            }
+        }
+    }
+
+  private:
+
+    void listen(uint16_t port) {
+        listen_ = check(::socket(PF_INET, SOCK_STREAM, 0), "create listener");
+        int yes = 1;
+        check(::setsockopt(listen_, SOL_SOCKET, SO_REUSEADDR, &yes, 
sizeof(int)), "setsockopt");
+        struct sockaddr_in name;
+        name.sin_family = AF_INET;
+        name.sin_port = htons (port);
+        name.sin_addr.s_addr = htonl (INADDR_ANY);
+        check(::bind(listen_, (struct sockaddr *)&name, sizeof(name)), "bind 
listener");
+        check(::listen(listen_, 32), "listen");
+        std::cout << "listening on port " << port << " fd=" << listen_ << 
std::endl;
+        FD_SET(listen_, &reading_);
+    }
+
+    void accept() {
+        struct sockaddr_in client_addr;
+        socklen_t size = sizeof(client_addr);
+        int fd = check(::accept(listen_, (struct sockaddr *)&client_addr, 
&size), "accept");
+        engines_[fd] = new proton::engine(handler_);
+        FD_SET(fd, &reading_);
+        FD_SET(fd, &writing_);
+        std::cout << "accept " << ::inet_ntoa(client_addr.sin_addr)
+                  << ":" << ::ntohs(client_addr.sin_port)
+                  << " fd=" << fd << std::endl;
+    }
+
+    void readable(int fd, proton::engine& eng) {
+        proton::buffer<char> input = eng.input();
+        if (input.size()) {
+            ssize_t n = check(::read(fd, input.begin(), input.size()));
+            if (n > 0) {
+                eng.received(n);
+            } else {
+                eng.close_input();
+            }
+        }
+    }
+
+    void writable(int fd, proton::engine& eng) {
+        proton::buffer<const char> output = eng.output();
+        if (output.size()) {
+            ssize_t n = check(::write(fd, output.begin(), output.size()));
+            if (n > 0)
+                eng.sent(n);
+            else {
+                eng.close_output();
+            }
+        }
+    }
+
+};
+
+int main(int argc, char **argv) {
+    // Command line options
+    proton::url url("0.0.0.0");
+    options opts(argc, argv);
+    opts.add_value(url, 'a', "address", "listen on URL", "URL");
+    try {
+        opts.parse();
+        broker().run(url.port_int());
+        return 0;
+    } catch (const bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+    return 1;
+}
+
+

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt 
b/proton-c/bindings/cpp/CMakeLists.txt
index 0c44fb9..3995c20 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -43,6 +43,7 @@ set(qpid-proton-cpp-source
   src/duration.cpp
   src/encoder.cpp
   src/endpoint.cpp
+  src/engine.cpp
   src/error.cpp
   src/event.cpp
   src/facade.cpp

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/README.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/README.md b/proton-c/bindings/cpp/README.md
index d84813b..c89534e 100644
--- a/proton-c/bindings/cpp/README.md
+++ b/proton-c/bindings/cpp/README.md
@@ -17,8 +17,6 @@ Tests
 Bugs
 - Error handling:
   - examples exit silently on broker exit/not running, core on no-such-queue 
(e.g. with qpidd)
-  - TODO/FIXME notes in code.
-- const correctness: consistent use of const where semantically appropriate in 
C++ APIs.
 
 Features
 - SASL/SSL support with interop tests.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/docs/mainpage.md
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/docs/mainpage.md 
b/proton-c/bindings/cpp/docs/mainpage.md
index b370b13..9129307 100644
--- a/proton-c/bindings/cpp/docs/mainpage.md
+++ b/proton-c/bindings/cpp/docs/mainpage.md
@@ -98,6 +98,12 @@ complex AMQP types. For details on converting between AMQP 
and C++ data types
 see the \ref encode_decode.cpp example and classes `proton::encoder`,
 `proton::decoder` and `proton::value`.
 
+`proton::engine` provides fewer facilities that `proton::container` but makes
+fewer (no) assumptions about application threading and IO. It may be easier to
+integrate with an existing IO framework, for multi-threaded applications or for
+non-socket IO. See the \ref select_broker.cpp example. The main application and
+AMQP logic is implemented the same way using the engine or the container.
+
 Delivery Guarantees
 -------------------
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/connection.hpp 
b/proton-c/bindings/cpp/include/proton/connection.hpp
index cacf004..68d46a3 100644
--- a/proton-c/bindings/cpp/include/proton/connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/connection.hpp
@@ -32,17 +32,28 @@ struct pn_connection_t;
 namespace proton {
 
 class handler;
-class transport;
+class engine;
 
 /** connection to a remote AMQP peer. */
 class connection : public counted_facade<pn_connection_t, connection, endpoint>
 {
   public:
-    ///@name getters @{
-    PN_CPP_EXTERN class transport& transport() const;
-    PN_CPP_EXTERN class container& container() const;
+
+    /// Get the event_loop, can be a container or an engine.
+    PN_CPP_EXTERN class event_loop &event_loop() const;
+
+    /// Get the container, throw an exception if event_loop is not a container.
+    PN_CPP_EXTERN class container &container() const;
+
+    /// Get the engine, , throw an exception if event_loop is not an engine.
+    PN_CPP_EXTERN class engine &engine() const;
+
+    /// Return the AMQP host name for the connection.
     PN_CPP_EXTERN std::string host() const;
-    ///@}
+
+    /// Return the container-ID for the connection. All connections have a 
container_id,
+    /// even if they don't have a container event_loop.
+    PN_CPP_EXTERN std::string container_id() const;
 
     /** Initiate local open, not complete till 
messaging_handler::on_connection_opened()
      * or proton_handler::on_connection_remote_open()

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp 
b/proton-c/bindings/cpp/include/proton/container.hpp
index 059416f..de91505 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -23,6 +23,7 @@
  */
 #include "proton/duration.hpp"
 #include "proton/export.hpp"
+#include "proton/event_loop.hpp"
 #include "proton/pn_unique_ptr.hpp"
 #include "proton/reactor.hpp"
 #include "proton/url.hpp"
@@ -47,8 +48,7 @@ class container_impl;
  * Note that by default, links belonging to the container have generated 
link-names
  * of the form
  */
-class container
-{
+class container : public event_loop {
   public:
     /// Container ID should be unique within your system. By default a random 
ID is generated.
     PN_CPP_EXTERN container(const std::string& id=std::string());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/engine.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/engine.hpp 
b/proton-c/bindings/cpp/include/proton/engine.hpp
new file mode 100644
index 0000000..672e911
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/engine.hpp
@@ -0,0 +1,155 @@
+#ifndef ENGINE_HPP
+#define ENGINE_HPP
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/pn_unique_ptr.hpp"
+#include "proton/export.hpp"
+#include "proton/event_loop.hpp"
+
+#include <cstddef>
+#include <utility>
+
+namespace proton {
+
+class handler;
+class connection;
+
+/// Pointers to a data buffer.
+template <class T> class buffer {
+  public:
+    explicit buffer(T* begin__=0, T* end__=0) : begin_(begin__), end_(end__) {}
+    explicit buffer(T* ptr, size_t n) : begin_(ptr), end_(ptr + n) {}
+    T* begin() const { return begin_; }
+    T* end() const { return end_; }
+    size_t size() const { return end() - begin(); }
+    bool empty() const { return !size(); }
+
+ private:
+    T* begin_;
+    T* end_;
+};
+
+/**
+ * An engine is an event_loop that manages a single AMQP connection.  It is
+ * useful for integrating AMQP into an existing IO framework.
+ *
+ * The engine provides a simple "bytes-in/bytes-out" interface. Incoming AMQP
+ * bytes from any kind of data connection are fed into the engine and processed
+ * to dispatch events to a proton::handler.  The resulting AMQP output data is
+ * available from the engine and can sent back over the connection.
+ *
+ * The engine does no IO of its own. It assumes a two-way flow of bytes over
+ * some externally-managed "connection". The "connection" could be a socket
+ * managed by select, poll, epoll or some other mechanism, or it could be
+ * something else such as an RDMA connection, a shared-memory buffer or a Unix
+ * pipe.
+ *
+ * The engine is an alternative event_loop to the proton::container. The
+ * container is easier to use in single-threaded, stand-alone applications that
+ * want to use standard socket connections.  The engine can be embedding into
+ * any existing IO framework for any type of IO.
+ *
+ * The application is coded the same way for engine or container: you implement
+ * proton::handler. Handlers attached to an engine will receive transport,
+ * connection, session, link and message events. They will not receive reactor,
+ * selectable or timer events, the engine assumes those are managed externally.
+ *
+ * THREAD SAFETY: A single engine instance cannot be called concurrently, but
+ * different engine instances can be processed concurrently in separate 
threads.
+ */
+class engine : public event_loop {
+  public:
+
+    // TODO aconway 2015-11-02: engine() take connection-options.
+    // TODO aconway 2015-11-02: engine needs to accept application events.
+    // TODO aconway 2015-11-02: generalize reconnect logic for container and 
engine.
+
+    /**
+     * Create an engine that will advertise id as the AMQP container-id for 
its connection.
+     */
+    PN_CPP_EXTERN engine(handler&, const std::string& id=std::string());
+
+    PN_CPP_EXTERN ~engine();
+
+    /**
+     * Input buffer. If input.size() == 0 means no input can be accepted right 
now, but
+     * sending output might free up space.
+     */
+    PN_CPP_EXTERN buffer<char> input();
+
+    /**
+     * Process n bytes from input(). Calls handler functions for the AMQP 
events
+     * encoded by the received data.
+     *
+     * After the call the input() and output() buffers may have changed.
+     */
+    PN_CPP_EXTERN void received(size_t n);
+
+    /**
+     * Indicate that no more input data is available from the external
+     * connection. May call handler functions.
+     */
+    PN_CPP_EXTERN void close_input();
+
+    /**
+     * Output buffer. Send data from this buffer and call sent() to indicate it
+     * has been sent. If output().size() == 0 there is no data to send,
+     * receiving more input data might make output data available.
+     */
+    PN_CPP_EXTERN buffer<const char> output();
+
+    /**
+     * Call when the first n bytes from the start of the output buffer have 
been
+     * sent.  May call handler functions.
+     *
+     * After the call the input() and output() buffers may have changed.
+     */
+    PN_CPP_EXTERN void sent(size_t n);
+
+    /**
+     * Indicate that no more output data can be sent to the external 
connection.
+     * May call handler functions.
+     */
+    PN_CPP_EXTERN void close_output();
+
+    /**
+     * True if engine is closed. This can either be because close_input() and
+     * close_output() were called to indicate the external connection has
+     * closed, or because AMQP close frames were received in the AMQP data.  In
+     * either case no more input() buffer space or output() data will be
+     * available for this engine.
+     */
+    PN_CPP_EXTERN bool closed() const;
+
+    /** The AMQP connection associated with this engine. */
+    PN_CPP_EXTERN class connection&  connection() const;
+
+    /** The AMQP container-id associated with this engine. */
+    PN_CPP_EXTERN  std::string id() const;
+
+  private:
+    void run();
+
+    struct impl;
+    pn_unique_ptr<impl> impl_;
+};
+
+}
+#endif // ENGINE_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/event.hpp 
b/proton-c/bindings/cpp/include/proton/event.hpp
index 6fb0c94..ed310f1 100644
--- a/proton-c/bindings/cpp/include/proton/event.hpp
+++ b/proton-c/bindings/cpp/include/proton/event.hpp
@@ -45,8 +45,15 @@ class event {
     /// Return the name of the event type
     virtual PN_CPP_EXTERN std::string name() const = 0;
 
-    /// Get container.
+    /// Get the event_loop object, can be a container or an engine.
+    virtual PN_CPP_EXTERN class event_loop &event_loop() const;
+
+    /// Get the container, throw an exception if event_loop is not a container.
     virtual PN_CPP_EXTERN class container &container() const;
+
+    /// Get the engine, , throw an exception if event_loop is not an engine.
+    virtual PN_CPP_EXTERN class engine &engine() const;
+
     /// Get connection.
     virtual PN_CPP_EXTERN class connection &connection() const;
     /// Get sender @throws error if no sender.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/event_loop.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/event_loop.hpp 
b/proton-c/bindings/cpp/include/proton/event_loop.hpp
new file mode 100644
index 0000000..e5dff37
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/event_loop.hpp
@@ -0,0 +1,41 @@
+#ifndef EVENT_LOOP_HPP
+#define EVENT_LOOP_HPP
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/export.hpp"
+
+#include <string>
+
+/**
+ * An event_loop dispatches events to event handlers.  event_loop is an 
abstract
+ * class, concrete subclasses are proton::container and prton::engine.
+*/
+class event_loop {
+  public:
+    PN_CPP_EXTERN virtual ~event_loop() {}
+
+    /// The AMQP container-id associated with this event loop.
+    /// Any connections managed by this event loop will have this container id.
+    PN_CPP_EXTERN  virtual std::string id() const = 0;
+
+    // TODO aconway 2015-11-02: injecting application events.
+};
+
+#endif // EVENT_LOOP_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/types.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/types.hpp 
b/proton-c/bindings/cpp/include/proton/types.hpp
index b1e7085..0f82ae3 100644
--- a/proton-c/bindings/cpp/include/proton/types.hpp
+++ b/proton-c/bindings/cpp/include/proton/types.hpp
@@ -121,9 +121,6 @@ struct amqp_binary : public std::string {
     operator pn_bytes_t() const { return pn_bytes(*this); }
 };
 
-// TODO aconway 2015-06-11: alternative representation of variable-length data
-// as pointer to existing buffer.
-
 /// Template for opaque proton proton types that can be treated as byte arrays.
 template <class P> struct opaque: public comparable<opaque<P> > {
     P value;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/include/proton/url.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/url.hpp 
b/proton-c/bindings/cpp/include/proton/url.hpp
index 42abadd..e8c68aa 100644
--- a/proton-c/bindings/cpp/include/proton/url.hpp
+++ b/proton-c/bindings/cpp/include/proton/url.hpp
@@ -20,6 +20,7 @@
  */
 
 #include "proton/facade.hpp"
+#include "proton/types.hpp"
 #include "proton/error.hpp"
 #include <iosfwd>
 
@@ -89,6 +90,9 @@ class url {
     PN_CPP_EXTERN std::string host() const;
     /** port is a string, it can be a number or a symbolic name like "amqp" */
     PN_CPP_EXTERN std::string port() const;
+    /** port_int is the numeric value of the port. */
+    PN_CPP_EXTERN uint16_t port_int() const;
+    /** path is everything after the final "/" */
     PN_CPP_EXTERN std::string path() const;
     //@}
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection.cpp 
b/proton-c/bindings/cpp/src/connection.cpp
index 1b35e03..2d401bd 100644
--- a/proton-c/bindings/cpp/src/connection.cpp
+++ b/proton-c/bindings/cpp/src/connection.cpp
@@ -37,10 +37,6 @@
 
 namespace proton {
 
-transport &connection::transport() const {
-    return *transport::cast(pn_connection_transport(pn_cast(this)));
-}
-
 void connection::open() { pn_connection_open(pn_cast(this)); }
 
 void connection::close() { pn_connection_close(pn_cast(this)); }
@@ -49,6 +45,11 @@ std::string connection::host() const {
     return std::string(pn_connection_get_hostname(pn_cast(this)));
 }
 
+std::string connection::container_id() const {
+    const char* id = pn_connection_get_container(pn_cast(this));
+    return id ? std::string(id) : std::string();
+}
+
 container& connection::container() const {
     return container_context(pn_object_reactor(pn_cast(this)));
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/connector.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connector.cpp 
b/proton-c/bindings/cpp/src/connector.cpp
index 49660ea..3333441 100644
--- a/proton-c/bindings/cpp/src/connector.cpp
+++ b/proton-c/bindings/cpp/src/connector.cpp
@@ -40,7 +40,7 @@ void connector::address(const url &a) {
 
 void connector::connect() {
     pn_connection_t *conn = pn_cast(connection_);
-    pn_connection_set_container(conn, connection_->container().id().c_str());
+    pn_connection_set_container(conn, connection_->container_id().c_str());
     pn_connection_set_hostname(conn, address_.host_port().c_str());
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp 
b/proton-c/bindings/cpp/src/container_impl.cpp
index fc97a3e..6dfd797 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -63,7 +63,7 @@ struct handler_context {
     static void dispatch(pn_handler_t *c_handler, pn_event_t *c_event, 
pn_event_type_t type)
     {
         handler_context& hc(handler_context::get(c_handler));
-        messaging_event mevent(c_event, type, *hc.container_);
+        messaging_event mevent(c_event, type, hc.container_);
         mevent.dispatch(*hc.handler_);
         return;
     }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/engine.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/engine.cpp 
b/proton-c/bindings/cpp/src/engine.cpp
new file mode 100644
index 0000000..61d4040
--- /dev/null
+++ b/proton-c/bindings/cpp/src/engine.cpp
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "proton/engine.hpp"
+#include "proton/error.hpp"
+
+#include "uuid.hpp"
+#include "proton_bits.hpp"
+#include "messaging_event.hpp"
+
+#include <proton/connection.h>
+#include <proton/transport.h>
+#include <proton/event.h>
+
+namespace proton {
+
+struct engine::impl {
+
+    impl(class handler& h, pn_transport_t *t) :
+        handler(h), transport(t), connection(pn_connection()), 
collector(pn_collector())
+    {}
+
+    ~impl() {
+        pn_transport_free(transport);
+        pn_connection_free(connection);
+        pn_collector_free(collector);
+    }
+
+    void check(int err, const std::string& msg) {
+        if (err)
+            throw proton::error(msg + error_str(pn_transport_error(transport), 
err));
+    }
+
+    pn_event_t *peek() { return pn_collector_peek(collector); }
+    void pop() { pn_collector_pop(collector); }
+
+    class handler& handler;
+    pn_transport_t *transport;
+    pn_connection_t *connection;
+    pn_collector_t * collector;
+};
+
+engine::engine(handler &h, const std::string& id_) : impl_(new impl(h, 
pn_transport())) {
+    if (!impl_->transport || !impl_->connection || !impl_->collector) 
+        throw error("engine setup failed");
+    std::string id = id_.empty() ? uuid().str() : id_;
+    pn_connection_set_container(impl_->connection, id.c_str());
+    impl_->check(pn_transport_bind(impl_->transport, impl_->connection), 
"engine bind: ");
+    pn_connection_collect(impl_->connection, impl_->collector);
+}
+
+engine::~engine() {}
+
+buffer<char> engine::input() {
+    ssize_t n = pn_transport_capacity(impl_->transport);
+    if (n <= 0)
+        return buffer<char>();
+    return buffer<char>(pn_transport_tail(impl_->transport), n);
+}
+
+void engine::close_input() {
+    pn_transport_close_tail(impl_->transport);
+    run();
+}
+
+void engine::received(size_t n) {
+    impl_->check(pn_transport_process(impl_->transport, n), "engine process: 
");
+    run();
+}
+
+void engine::run() {
+    for (pn_event_t *e = impl_->peek(); e; e = impl_->peek()) {
+        switch (pn_event_type(e)) {
+          case PN_CONNECTION_REMOTE_CLOSE:
+            pn_transport_close_tail(impl_->transport);
+            break;
+          case PN_CONNECTION_LOCAL_CLOSE:
+            pn_transport_close_head(impl_->transport);
+            break;
+          default:
+            break;
+        }
+        messaging_event mevent(e, pn_event_type(e), this);
+        mevent.dispatch(impl_->handler);
+        impl_->pop();
+    }
+}
+
+buffer<const char> engine::output() {
+    ssize_t n = pn_transport_pending(impl_->transport);
+    if (n <= 0)
+        return buffer<const char>();
+    return buffer<const char>(pn_transport_head(impl_->transport), n);
+}
+
+void engine::sent(size_t n) {
+    pn_transport_pop(impl_->transport, n);
+    run();
+}
+
+void engine::close_output() {
+    pn_transport_close_head(impl_->transport);
+    run();
+}
+
+bool engine::closed() const {
+    return pn_transport_closed(impl_->transport);
+}
+
+class connection& engine::connection() const {
+    return *connection::cast(impl_->connection);
+}
+
+std::string engine::id() const { return connection().container_id(); }
+
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/event.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/event.cpp 
b/proton-c/bindings/cpp/src/event.cpp
index d79df33..a732cc4 100644
--- a/proton-c/bindings/cpp/src/event.cpp
+++ b/proton-c/bindings/cpp/src/event.cpp
@@ -38,11 +38,20 @@ event::event() {}
 
 event::~event() {}
 
+event_loop &event::event_loop() const {
+    throw error(MSG("No event_loop context for event"));
+}
+
 container &event::container() const {
     // Subclasses to override as appropriate
     throw error(MSG("No container context for event"));
 }
 
+engine &event::engine() const {
+    // Subclasses to override as appropriate
+    throw error(MSG("No engine context for event"));
+}
+
 connection &event::connection() const {
     throw error(MSG("No connection context for event"));
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/messaging_event.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_event.cpp 
b/proton-c/bindings/cpp/src/messaging_event.cpp
index 3e59b1c..3a0514d 100644
--- a/proton-c/bindings/cpp/src/messaging_event.cpp
+++ b/proton-c/bindings/cpp/src/messaging_event.cpp
@@ -40,12 +40,12 @@
 
 namespace proton {
 
-messaging_event::messaging_event(pn_event_t *ce, proton_event::event_type t, 
class container &c) :
-    proton_event(ce, t, c), type_(messaging_event::PROTON), parent_event_(0), 
message_(0)
+messaging_event::messaging_event(pn_event_t *ce, proton_event::event_type t, 
class event_loop *el) :
+    proton_event(ce, t, el), type_(messaging_event::PROTON), parent_event_(0), 
message_(0)
 {}
 
 messaging_event::messaging_event(event_type t, proton_event &p) :
-    proton_event(NULL, PN_EVENT_NONE, p.container()), type_(t), 
parent_event_(&p), message_(0)
+    proton_event(NULL, PN_EVENT_NONE, &p.event_loop()), type_(t), 
parent_event_(&p), message_(0)
 {
     if (type_ == messaging_event::PROTON)
         throw error(MSG("invalid messaging event type"));

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/messaging_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_event.hpp 
b/proton-c/bindings/cpp/src/messaging_event.hpp
index 84cf3f3..f97d1a5 100644
--- a/proton-c/bindings/cpp/src/messaging_event.hpp
+++ b/proton-c/bindings/cpp/src/messaging_event.hpp
@@ -83,7 +83,7 @@ class messaging_event : public proton_event
         TRANSPORT_CLOSED
     };
 
-    messaging_event(pn_event_t *ce, proton_event::event_type t, class 
container &c);
+    messaging_event(pn_event_t *, proton_event::event_type, class event_loop 
*);
     messaging_event(event_type t, proton_event &parent);
     ~messaging_event();
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/proton_event.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proton_event.cpp 
b/proton-c/bindings/cpp/src/proton_event.cpp
index 649d908..068d245 100644
--- a/proton-c/bindings/cpp/src/proton_event.cpp
+++ b/proton-c/bindings/cpp/src/proton_event.cpp
@@ -24,6 +24,7 @@
 #include "proton/link.h"
 
 #include "proton/container.hpp"
+#include "proton/engine.hpp"
 #include "proton/delivery.hpp"
 #include "proton/error.hpp"
 #include "proton_event.hpp"
@@ -36,10 +37,10 @@
 
 namespace proton {
 
-proton_event::proton_event(pn_event_t *ce, proton_event::event_type t, class 
container &c) :
+proton_event::proton_event(pn_event_t *ce, proton_event::event_type t, class 
event_loop *el) :
     pn_event_(ce),
     type_(t),
-    container_(c)
+    event_loop_(el)
 {}
 
 int proton_event::type() const { return type_; }
@@ -48,7 +49,25 @@ std::string proton_event::name() const { return 
pn_event_type_name(pn_event_type
 
 pn_event_t *proton_event::pn_event() const { return pn_event_; }
 
-container &proton_event::container() const { return container_; }
+event_loop &proton_event::event_loop() const {
+    if (!event_loop_)
+        throw error(MSG("No event_loop context for this event"));
+    return *event_loop_;
+}
+
+container &proton_event::container() const {
+    class container *c = dynamic_cast<class container*>(event_loop_);
+    if (!c)
+        throw error(MSG("No container context for this event"));
+    return *c;
+}
+
+engine &proton_event::engine() const {
+    class engine *e = dynamic_cast<class engine*>(event_loop_);
+    if (!e)
+        throw error(MSG("No engine context for this event"));
+    return *e;
+}
 
 connection &proton_event::connection() const {
     pn_connection_t *conn = pn_event_connection(pn_event());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/proton_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proton_event.hpp 
b/proton-c/bindings/cpp/src/proton_event.hpp
index 24ca5e4..e382d8c 100644
--- a/proton-c/bindings/cpp/src/proton_event.hpp
+++ b/proton-c/bindings/cpp/src/proton_event.hpp
@@ -50,182 +50,182 @@ class proton_event : public event
      * Defined as a programming convenience. No event of this type will
      * ever be generated.
      */
-    PN_CPP_EXTERN static const event_type EVENT_NONE;
+    static const event_type EVENT_NONE;
 
     /**
      * A reactor has been started. Events of this type point to the reactor.
      */
-    PN_CPP_EXTERN static const event_type REACTOR_INIT;
+    static const event_type REACTOR_INIT;
 
     /**
      * A reactor has no more events to process. Events of this type
      * point to the reactor.
      */
-    PN_CPP_EXTERN static const event_type REACTOR_QUIESCED;
+    static const event_type REACTOR_QUIESCED;
 
     /**
      * A reactor has been stopped. Events of this type point to the reactor.
      */
-    PN_CPP_EXTERN static const event_type REACTOR_FINAL;
+    static const event_type REACTOR_FINAL;
 
     /**
      * A timer event has occurred.
      */
-    PN_CPP_EXTERN static const event_type TIMER_TASK;
+    static const event_type TIMER_TASK;
 
     /**
      * The connection has been created. This is the first event that
      * will ever be issued for a connection. Events of this type point
      * to the relevant connection.
      */
-    PN_CPP_EXTERN static const event_type CONNECTION_INIT;
+    static const event_type CONNECTION_INIT;
 
     /**
      * The connection has been bound to a transport. This event is
      * issued when the transport::bind() is called.
      */
-    PN_CPP_EXTERN static const event_type CONNECTION_BOUND;
+    static const event_type CONNECTION_BOUND;
 
     /**
      * The connection has been unbound from its transport. This event is
      * issued when transport::unbind() is called.
      */
-    PN_CPP_EXTERN static const event_type CONNECTION_UNBOUND;
+    static const event_type CONNECTION_UNBOUND;
 
     /**
      * The local connection endpoint has been closed. Events of this
      * type point to the relevant connection.
      */
-    PN_CPP_EXTERN static const event_type CONNECTION_LOCAL_OPEN;
+    static const event_type CONNECTION_LOCAL_OPEN;
 
     /**
      * The remote endpoint has opened the connection. Events of this
      * type point to the relevant connection.
      */
-    PN_CPP_EXTERN static const event_type CONNECTION_REMOTE_OPEN;
+    static const event_type CONNECTION_REMOTE_OPEN;
 
     /**
      * The local connection endpoint has been closed. Events of this
      * type point to the relevant connection.
      */
-    PN_CPP_EXTERN static const event_type CONNECTION_LOCAL_CLOSE;
+    static const event_type CONNECTION_LOCAL_CLOSE;
 
     /**
      *  The remote endpoint has closed the connection. Events of this
      *  type point to the relevant connection.
      */
-    PN_CPP_EXTERN static const event_type CONNECTION_REMOTE_CLOSE;
+    static const event_type CONNECTION_REMOTE_CLOSE;
 
     /**
      * The connection has been freed and any outstanding processing has
      * been completed. This is the final event that will ever be issued
      * for a connection.
      */
-    PN_CPP_EXTERN static const event_type CONNECTION_FINAL;
+    static const event_type CONNECTION_FINAL;
 
     /**
      * The session has been created. This is the first event that will
      * ever be issued for a session.
      */
-    PN_CPP_EXTERN static const event_type SESSION_INIT;
+    static const event_type SESSION_INIT;
 
     /**
      * The local session endpoint has been opened. Events of this type
      * point to the relevant session.
      */
-    PN_CPP_EXTERN static const event_type SESSION_LOCAL_OPEN;
+    static const event_type SESSION_LOCAL_OPEN;
 
     /**
      * The remote endpoint has opened the session. Events of this type
      * point to the relevant session.
      */
-    PN_CPP_EXTERN static const event_type SESSION_REMOTE_OPEN;
+    static const event_type SESSION_REMOTE_OPEN;
 
     /**
      * The local session endpoint has been closed. Events of this type
      * point ot the relevant session.
      */
-    PN_CPP_EXTERN static const event_type SESSION_LOCAL_CLOSE;
+    static const event_type SESSION_LOCAL_CLOSE;
 
     /**
      * The remote endpoint has closed the session. Events of this type
      * point to the relevant session.
      */
-    PN_CPP_EXTERN static const event_type SESSION_REMOTE_CLOSE;
+    static const event_type SESSION_REMOTE_CLOSE;
 
     /**
      * The session has been freed and any outstanding processing has
      * been completed. This is the final event that will ever be issued
      * for a session.
      */
-    PN_CPP_EXTERN static const event_type SESSION_FINAL;
+    static const event_type SESSION_FINAL;
 
     /**
      * The link has been created. This is the first event that will ever
      * be issued for a link.
      */
-    PN_CPP_EXTERN static const event_type LINK_INIT;
+    static const event_type LINK_INIT;
 
     /**
      * The local link endpoint has been opened. Events of this type
      * point ot the relevant link.
      */
-    PN_CPP_EXTERN static const event_type LINK_LOCAL_OPEN;
+    static const event_type LINK_LOCAL_OPEN;
 
     /**
      * The remote endpoint has opened the link. Events of this type
      * point to the relevant link.
      */
-    PN_CPP_EXTERN static const event_type LINK_REMOTE_OPEN;
+    static const event_type LINK_REMOTE_OPEN;
 
     /**
      * The local link endpoint has been closed. Events of this type
      * point ot the relevant link.
      */
-    PN_CPP_EXTERN static const event_type LINK_LOCAL_CLOSE;
+    static const event_type LINK_LOCAL_CLOSE;
 
     /**
      * The remote endpoint has closed the link. Events of this type
      * point to the relevant link.
      */
-    PN_CPP_EXTERN static const event_type LINK_REMOTE_CLOSE;
+    static const event_type LINK_REMOTE_CLOSE;
 
     /**
      * The local link endpoint has been detached. Events of this type
      * point to the relevant link.
      */
-    PN_CPP_EXTERN static const event_type LINK_LOCAL_DETACH;
+    static const event_type LINK_LOCAL_DETACH;
 
     /**
      * The remote endpoint has detached the link. Events of this type
      * point to the relevant link.
      */
-    PN_CPP_EXTERN static const event_type LINK_REMOTE_DETACH;
+    static const event_type LINK_REMOTE_DETACH;
 
     /**
      * The flow control state for a link has changed. Events of this
      * type point to the relevant link.
      */
-    PN_CPP_EXTERN static const event_type LINK_FLOW;
+    static const event_type LINK_FLOW;
 
     /**
      * The link has been freed and any outstanding processing has been
      * completed. This is the final event that will ever be issued for a
      * link. Events of this type point to the relevant link.
      */
-    PN_CPP_EXTERN static const event_type LINK_FINAL;
+    static const event_type LINK_FINAL;
 
     /**
      * A delivery has been created or updated. Events of this type point
      * to the relevant delivery.
      */
-    PN_CPP_EXTERN static const event_type DELIVERY;
+    static const event_type DELIVERY;
 
     /**
      * The transport has new data to read and/or write. Events of this
      * type point to the relevant transport.
      */
-    PN_CPP_EXTERN static const event_type TRANSPORT;
+    static const event_type TRANSPORT;
 
     /**
      * The transport has authenticated, if this is received by a server
@@ -233,64 +233,67 @@ class proton_event : public event
      * and transport::user() can be used to obtain the authenticated
      * user.
      */
-    PN_CPP_EXTERN static const event_type TRANSPORT_AUTHENTICATED;
+    static const event_type TRANSPORT_AUTHENTICATED;
 
     /**
      * Indicates that a transport error has occurred. Use
      * transport::condition() to access the details of the error
      * from the associated transport.
      */
-    PN_CPP_EXTERN static const event_type TRANSPORT_ERROR;
+    static const event_type TRANSPORT_ERROR;
 
     /**
      * Indicates that the head of the transport has been closed. This
      * means the transport will never produce more bytes for output to
      * the network. Events of this type point to the relevant transport.
      */
-    PN_CPP_EXTERN static const event_type TRANSPORT_HEAD_CLOSED;
+    static const event_type TRANSPORT_HEAD_CLOSED;
 
     /**
      * Indicates that the tail of the transport has been closed. This
      * means the transport will never be able to process more bytes from
      * the network. Events of this type point to the relevant transport.
      */
-    PN_CPP_EXTERN static const event_type TRANSPORT_TAIL_CLOSED;
+    static const event_type TRANSPORT_TAIL_CLOSED;
 
     /**
      * Indicates that the both the head and tail of the transport are
      * closed. Events of this type point to the relevant transport.
      */
-    PN_CPP_EXTERN static const event_type TRANSPORT_CLOSED;
+    static const event_type TRANSPORT_CLOSED;
 
-    PN_CPP_EXTERN static const event_type SELECTABLE_INIT;
-    PN_CPP_EXTERN static const event_type SELECTABLE_UPDATED;
-    PN_CPP_EXTERN static const event_type SELECTABLE_READABLE;
-    PN_CPP_EXTERN static const event_type SELECTABLE_WRITABLE;
-    PN_CPP_EXTERN static const event_type SELECTABLE_ERROR;
-    PN_CPP_EXTERN static const event_type SELECTABLE_EXPIRED;
-    PN_CPP_EXTERN static const event_type SELECTABLE_FINAL;
+    static const event_type SELECTABLE_INIT;
+    static const event_type SELECTABLE_UPDATED;
+    static const event_type SELECTABLE_READABLE;
+    static const event_type SELECTABLE_WRITABLE;
+    static const event_type SELECTABLE_ERROR;
+    static const event_type SELECTABLE_EXPIRED;
+    static const event_type SELECTABLE_FINAL;
 
     ///@}
 
-    virtual PN_CPP_EXTERN void dispatch(handler &h);
-    virtual PN_CPP_EXTERN class container &container() const;
-    virtual PN_CPP_EXTERN class connection &connection() const;
-    virtual PN_CPP_EXTERN class sender& sender() const;
-    virtual PN_CPP_EXTERN class receiver& receiver() const;
-    virtual PN_CPP_EXTERN class link& link() const;
-    virtual PN_CPP_EXTERN class delivery& delivery() const;
+    virtual void dispatch(handler &h);
+    virtual class event_loop &event_loop() const;
+    virtual class container &container() const;
+    virtual class engine &engine() const;
+    virtual class connection &connection() const;
+    virtual class sender& sender() const;
+    virtual class receiver& receiver() const;
+    virtual class link& link() const;
+    virtual class delivery& delivery() const;
 
     /** Get type of event */
-    PN_CPP_EXTERN event_type type() const;
+    event_type type() const;
 
-    PN_CPP_EXTERN pn_event_t* pn_event() const;
+    pn_event_t* pn_event() const;
 
   protected:
-    PN_CPP_EXTERN proton_event(pn_event_t *ce, proton_event::event_type t, 
class container &c);
+    proton_event(pn_event_t *, proton_event::event_type, class event_loop *);
+
   private:
     mutable pn_event_t *pn_event_;
     event_type type_;
-    class container &container_;
+    class event_loop *event_loop_;
 };
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/193a7dd5/proton-c/bindings/cpp/src/url.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/url.cpp 
b/proton-c/bindings/cpp/src/url.cpp
index b9407de..ec02e53 100644
--- a/proton-c/bindings/cpp/src/url.cpp
+++ b/proton-c/bindings/cpp/src/url.cpp
@@ -22,8 +22,7 @@
 #include "proton/error.hpp"
 #include "proton/url.hpp"
 #include "proton/url.h"
-#include <ostream>
-#include <istream>
+#include <sstream>
 
 namespace proton {
 
@@ -94,6 +93,18 @@ void url::defaults() {
 const std::string url::AMQP("amqp");
 const std::string url::AMQPS("amqps");
 
+uint16_t url::port_int() const {
+    // TODO aconway 2015-10-27: full service name lookup
+    if (port() == AMQP) return 5672;
+    if (port() == AMQPS) return 5671;
+    std::istringstream is(port());
+    uint16_t result;
+    is >> result;
+    if (is.fail())
+        throw url_error("invalid port '" + port() + "'");
+    return result;
+}
+
 std::ostream& operator<<(std::ostream& o, const url& u) { return o << u.str(); 
}
 
 std::istream& operator>>(std::istream& i, url& u) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to