Repository: qpid-proton
Updated Branches:
  refs/heads/cjansen-cpp-client c4e2e596d -> 8e1757ebc


PROTON-865: C++ blocking receiver


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/4339b1c7
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/4339b1c7
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/4339b1c7

Branch: refs/heads/cjansen-cpp-client
Commit: 4339b1c7a7d2813ec091578af5508885ab29f43c
Parents: c4e2e59
Author: Clifford Jansen <[email protected]>
Authored: Sun Jul 26 10:13:30 2015 -0700
Committer: Clifford Jansen <[email protected]>
Committed: Sun Jul 26 10:13:30 2015 -0700

----------------------------------------------------------------------
 examples/cpp/helloworld_blocking.cpp            |  35 ++-----
 proton-c/bindings/cpp/CMakeLists.txt            |   2 +
 .../cpp/include/proton/blocking_connection.hpp  |   3 +
 .../cpp/include/proton/blocking_receiver.hpp    |  60 +++++++++++
 .../bindings/cpp/include/proton/container.hpp   |   5 +-
 .../bindings/cpp/include/proton/delivery.hpp    |   3 +-
 proton-c/bindings/cpp/include/proton/event.hpp  |   2 +
 .../cpp/include/proton/messaging_event.hpp      |   1 +
 .../cpp/include/proton/proton_event.hpp         |   1 +
 .../bindings/cpp/include/proton/receiver.hpp    |   1 +
 .../bindings/cpp/src/blocking_connection.cpp    |  19 ++++
 .../cpp/src/blocking_connection_impl.hpp        |   6 +-
 proton-c/bindings/cpp/src/blocking_link.cpp     |   4 +-
 proton-c/bindings/cpp/src/blocking_receiver.cpp | 104 +++++++++++++++++++
 proton-c/bindings/cpp/src/blocking_sender.cpp   |   2 +-
 proton-c/bindings/cpp/src/connection_impl.cpp   |   4 +
 proton-c/bindings/cpp/src/container.cpp         |   5 +-
 proton-c/bindings/cpp/src/container_impl.cpp    |  25 ++++-
 proton-c/bindings/cpp/src/container_impl.hpp    |   3 +-
 proton-c/bindings/cpp/src/delivery.cpp          |   4 +
 proton-c/bindings/cpp/src/event.cpp             |   4 +
 proton-c/bindings/cpp/src/fetcher.cpp           |  92 ++++++++++++++++
 proton-c/bindings/cpp/src/fetcher.hpp           |  58 +++++++++++
 proton-c/bindings/cpp/src/messaging_event.cpp   |   8 ++
 proton-c/bindings/cpp/src/proton_event.cpp      |   7 ++
 proton-c/bindings/cpp/src/receiver.cpp          |   3 +
 26 files changed, 419 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/examples/cpp/helloworld_blocking.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/helloworld_blocking.cpp 
b/examples/cpp/helloworld_blocking.cpp
index 2b6b4db..9f56fa2 100644
--- a/examples/cpp/helloworld_blocking.cpp
+++ b/examples/cpp/helloworld_blocking.cpp
@@ -22,43 +22,28 @@
 #include "proton/container.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/blocking_sender.hpp"
+#include "proton/blocking_receiver.hpp"
+#include "proton/duration.hpp"
 
 #include <iostream>
 
-class hello_world_blocking : public proton::messaging_handler {
-  private:
-    proton::url url;
-
-  public:
-
-    hello_world_blocking(const proton::url& u) : url(u) {}
-
-    void on_start(proton::event &e) {
-        proton::connection conn = e.container().connect(url);
-        e.container().create_receiver(conn, url.path());
-    }
-
-    void on_message(proton::event &e) {
-        std::cout << e.message().body() << std::endl;
-        e.connection().close();
-    }
-
-};
-
 int main(int argc, char **argv) {
     try {
         proton::url url(argc > 1 ? argv[1] : "127.0.0.1:5672/examples");
-
         proton::blocking_connection conn(url);
+        proton::blocking_receiver receiver = conn.create_receiver(url.path());
         proton::blocking_sender sender = conn.create_sender(url.path());
+
         proton::message m;
         m.body("Hello World!");
         sender.send(m);
-        conn.close();
 
-        // TODO Temporary hack until blocking receiver available
-        hello_world_blocking hw(url);
-        proton::container(hw).run();
+        proton::duration timeout(30000);
+        proton::message m2 = receiver.receive(timeout);
+        std::cout << m2.body() << std::endl;
+        receiver.accept();
+
+        conn.close();
         return 0;
     } catch (const std::exception& e) {
         std::cerr << e.what() << std::endl;

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/CMakeLists.txt 
b/proton-c/bindings/cpp/CMakeLists.txt
index 6b56a8e..fcb0fb1 100644
--- a/proton-c/bindings/cpp/CMakeLists.txt
+++ b/proton-c/bindings/cpp/CMakeLists.txt
@@ -64,10 +64,12 @@ set(qpid-proton-cpp-source
   src/value.cpp
   src/values.cpp
   src/proton_bits.cpp
+  src/fetcher.cpp
   src/blocking_connection.cpp
   src/blocking_connection_impl.cpp
   src/blocking_link.cpp
   src/blocking_sender.cpp
+  src/blocking_receiver.cpp
   src/contexts.cpp
   src/types.cpp
   )

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp 
b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
index 9aa456f..b6f0499 100644
--- a/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
+++ b/proton-c/bindings/cpp/include/proton/blocking_connection.hpp
@@ -38,6 +38,7 @@ class container;
 class blocking_connection_impl;
 class ssl_domain;
 class blocking_sender;
+class blocking_receiver;
 class wait_condition;
 
 // TODO documentation
@@ -54,6 +55,8 @@ class blocking_connection : public 
handle<blocking_connection_impl>
     PN_CPP_EXTERN void close();
 
     PN_CPP_EXTERN blocking_sender create_sender(const std::string &address, 
handler *h=0);
+    PN_CPP_EXTERN blocking_receiver create_receiver(const std::string 
&address, int credit = 0,
+                                                    bool dynamic = false, 
std::string name = std::string());
     PN_CPP_EXTERN void wait(wait_condition &condition);
     PN_CPP_EXTERN void wait(wait_condition &condition, std::string &msg, 
duration timeout=duration::FOREVER);
     PN_CPP_EXTERN duration timeout();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp 
b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
new file mode 100644
index 0000000..319fd93
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/blocking_receiver.hpp
@@ -0,0 +1,60 @@
+#ifndef PROTON_CPP_BLOCKING_RECEIVER_HPP
+#define PROTON_CPP_BLOCKING_RECEIVER_HPP
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/export.hpp"
+#include "proton/container.hpp"
+#include "proton/blocking_link.hpp"
+#include "proton/duration.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/types.h"
+#include "proton/delivery.h"
+#include <string>
+
+namespace proton {
+
+class blocking_connection;
+class blocking_link;
+class fetcher;
+
+class blocking_receiver : public blocking_link
+{
+  public:
+    PN_CPP_EXTERN blocking_receiver(const blocking_receiver&);
+    PN_CPP_EXTERN blocking_receiver& operator=(const blocking_receiver&);
+    PN_CPP_EXTERN ~blocking_receiver();
+    PN_CPP_EXTERN message receive();
+    PN_CPP_EXTERN message receive(duration timeout);
+    PN_CPP_EXTERN void accept();
+    PN_CPP_EXTERN void reject();
+    PN_CPP_EXTERN void release(bool delivered = true);
+    PN_CPP_EXTERN void settle();
+    PN_CPP_EXTERN void settle(delivery::state state);
+  private:
+    blocking_receiver(blocking_connection &c, receiver &l, fetcher &f, int 
credit);
+    fetcher &fetcher_;
+    friend class blocking_connection;
+};
+
+}
+
+#endif  /*!PROTON_CPP_BLOCKING_RECEIVER_HPP*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp 
b/proton-c/bindings/cpp/include/proton/container.hpp
index ec55167..a0ca59a 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -81,9 +81,9 @@ class container : public handle<container_impl>
     PN_CPP_EXTERN sender create_sender(const proton::url &);
 
     /** Create a receiver on connection with target=addr and optional handler 
h */
-    PN_CPP_EXTERN receiver create_receiver(connection &connection, const 
std::string &addr, handler *h=0);
+    PN_CPP_EXTERN receiver create_receiver(connection &connection, const 
std::string &addr, bool dynamic=false, handler *h=0);
 
-    /** Create a receiver on connection with source=addr and optional handler 
h */
+    /** Create a receiver on connection with source=url.path() */
     PN_CPP_EXTERN receiver create_receiver(const url &);
 
     /** Open a connection to url and create a receiver with source=url.path() 
*/
@@ -103,6 +103,7 @@ class container : public handle<container_impl>
 
     PN_CPP_EXTERN void wakeup();
     PN_CPP_EXTERN bool is_quiesced();
+    PN_CPP_EXTERN void yield();
 private:
    friend class private_impl_ref<container>;
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/delivery.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/delivery.hpp 
b/proton-c/bindings/cpp/include/proton/delivery.hpp
index 3f4dafc..3fa3069 100644
--- a/proton-c/bindings/cpp/include/proton/delivery.hpp
+++ b/proton-c/bindings/cpp/include/proton/delivery.hpp
@@ -56,7 +56,8 @@ class delivery : public proton_handle<pn_delivery_t>
     /** Settle the delivery, informs the remote end. */
     PN_CPP_EXTERN void settle();
 
-    // TODO aconway 2015-07-15: add update() here?
+    /** Set the local state of the delivery. */
+    PN_CPP_EXTERN void update(delivery::state state);
 
     PN_CPP_EXTERN pn_delivery_t *pn_delivery();
   private:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/event.hpp 
b/proton-c/bindings/cpp/include/proton/event.hpp
index c4c5d7c..22866e1 100644
--- a/proton-c/bindings/cpp/include/proton/event.hpp
+++ b/proton-c/bindings/cpp/include/proton/event.hpp
@@ -52,6 +52,8 @@ class event {
     virtual PN_CPP_EXTERN class receiver receiver();
     /// Get link @throws error if no link.
     virtual PN_CPP_EXTERN class link link();
+    /// Get delivey @throws error if no delivery.
+    virtual PN_CPP_EXTERN class delivery delivery();
     /// Get message @throws error if no message.
     virtual PN_CPP_EXTERN class message message();
     /// Get message @throws error if no message.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/messaging_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/messaging_event.hpp 
b/proton-c/bindings/cpp/include/proton/messaging_event.hpp
index 950c389..e27003c 100644
--- a/proton-c/bindings/cpp/include/proton/messaging_event.hpp
+++ b/proton-c/bindings/cpp/include/proton/messaging_event.hpp
@@ -89,6 +89,7 @@ class messaging_event : public proton_event
     virtual PN_CPP_EXTERN class receiver receiver();
     virtual PN_CPP_EXTERN class link link();
     virtual PN_CPP_EXTERN class message message();
+    virtual PN_CPP_EXTERN class delivery delivery();
     virtual PN_CPP_EXTERN void message(class message &);
     PN_CPP_EXTERN event_type type() const;
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/proton_event.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/proton_event.hpp 
b/proton-c/bindings/cpp/include/proton/proton_event.hpp
index d811ae5..1243717 100644
--- a/proton-c/bindings/cpp/include/proton/proton_event.hpp
+++ b/proton-c/bindings/cpp/include/proton/proton_event.hpp
@@ -274,6 +274,7 @@ class proton_event : public event
     virtual PN_CPP_EXTERN class sender sender();
     virtual PN_CPP_EXTERN class receiver receiver();
     virtual PN_CPP_EXTERN class link link();
+    virtual PN_CPP_EXTERN class delivery delivery();
 
     /** Get type of event */
     PN_CPP_EXTERN event_type type();

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/include/proton/receiver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/receiver.hpp 
b/proton-c/bindings/cpp/include/proton/receiver.hpp
index 4f2df0f..1974cd0 100644
--- a/proton-c/bindings/cpp/include/proton/receiver.hpp
+++ b/proton-c/bindings/cpp/include/proton/receiver.hpp
@@ -38,6 +38,7 @@ class receiver : public link
     PN_CPP_EXTERN receiver(pn_link_t *lnk);
     PN_CPP_EXTERN receiver();
     PN_CPP_EXTERN receiver(const link& c);
+    PN_CPP_EXTERN void flow(int count);
   protected:
     PN_CPP_EXTERN virtual void verify_type(pn_link_t *l);
 };

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_connection.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection.cpp 
b/proton-c/bindings/cpp/src/blocking_connection.cpp
index 02cec85..dd1aebb 100644
--- a/proton-c/bindings/cpp/src/blocking_connection.cpp
+++ b/proton-c/bindings/cpp/src/blocking_connection.cpp
@@ -21,9 +21,11 @@
 #include "proton/container.hpp"
 #include "proton/blocking_connection.hpp"
 #include "proton/blocking_sender.hpp"
+#include "proton/blocking_receiver.hpp"
 #include "proton/messaging_handler.hpp"
 #include "proton/url.hpp"
 #include "proton/error.hpp"
+#include "fetcher.hpp"
 #include "msg.hpp"
 #include "blocking_connection_impl.hpp"
 #include "private_impl_ref.hpp"
@@ -57,6 +59,23 @@ blocking_sender blocking_connection::create_sender(const 
std::string &address, h
     return blocking_sender(*this, sender);
 }
 
+namespace {
+struct fetcher_guard{
+    fetcher_guard(fetcher &f) : fetcher_(f) { fetcher_.incref(); }
+    ~fetcher_guard() { fetcher_.decref(); }
+    fetcher& fetcher_;
+};
+}
+
+blocking_receiver blocking_connection::create_receiver(const std::string 
&address, int credit,
+                                                       bool dynamic, 
std::string name) {
+    fetcher *f = new fetcher(*this, credit);
+    fetcher_guard fg(*f);
+    receiver receiver = impl_->container_.create_receiver(impl_->connection_, 
address, dynamic, f);
+    blocking_receiver brcv(*this, receiver, *f, credit);
+    return brcv;
+}
+
 duration blocking_connection::timeout() { return impl_->timeout(); }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp 
b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
index edb08a6..7e8c031 100644
--- a/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
+++ b/proton-c/bindings/cpp/src/blocking_connection_impl.hpp
@@ -1,5 +1,5 @@
-#ifndef PROTON_CPP_CONNECTIONIMPL_H
-#define PROTON_CPP_CONNECTIONIMPL_H
+#ifndef PROTON_CPP_BLOCKINGCONNECTIONIMPL_H
+#define PROTON_CPP_BLOCKINGCONNECTIONIMPL_H
 
 /*
  *
@@ -59,4 +59,4 @@ class ssl_domain;
 
 }
 
-#endif  /*!PROTON_CPP_CONNECTIONIMPL_H*/
+#endif  /*!PROTON_CPP_BLOCKINGCONNECTIONIMPL_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_link.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_link.cpp 
b/proton-c/bindings/cpp/src/blocking_link.cpp
index b9f23c4..c7f3551 100644
--- a/proton-c/bindings/cpp/src/blocking_link.cpp
+++ b/proton-c/bindings/cpp/src/blocking_link.cpp
@@ -55,6 +55,7 @@ blocking_link::blocking_link(blocking_connection *c, 
pn_link_t *pnl) : connectio
     std::string msg = "Opening link " + link_.name();
     link_opened link_opened(link_.pn_link());
     connection_.wait(link_opened, msg);
+    check_closed();
 }
 
 blocking_link::~blocking_link() {}
@@ -70,8 +71,7 @@ void blocking_link::check_closed() {
     pn_link_t * pn_link = link_.pn_link();
     if (pn_link_state(pn_link) & PN_REMOTE_CLOSED) {
         link_.close();
-        // TODO: link_detached exception
-        throw error(MSG("link detached"));
+        throw error(MSG("Link detached: " << link_.name()));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_receiver.cpp 
b/proton-c/bindings/cpp/src/blocking_receiver.cpp
new file mode 100644
index 0000000..042eb29
--- /dev/null
+++ b/proton-c/bindings/cpp/src/blocking_receiver.cpp
@@ -0,0 +1,104 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/blocking_receiver.hpp"
+#include "proton/blocking_connection.hpp"
+#include "proton/wait_condition.hpp"
+#include "proton/error.hpp"
+#include "fetcher.hpp"
+#include "msg.hpp"
+
+
+namespace proton {
+
+namespace {
+
+struct fetcher_has_message : public wait_condition {
+    fetcher_has_message(fetcher &f) : fetcher_(f) {}
+    bool achieved() { return fetcher_.has_message(); }
+    fetcher &fetcher_;
+};
+
+} // namespace
+
+
+blocking_receiver::blocking_receiver(blocking_connection &c, receiver &l, 
fetcher &f, int credit)
+    : blocking_link(&c, l.pn_link()), fetcher_(f) {
+    std::string sa = link_.source().address();
+    std::string rsa = link_.remote_source().address();
+    if (sa.empty() || sa.compare(rsa) != 0) {
+        wait_for_closed();
+        link_.close();
+        std::string txt = "Failed to open receiver " + link_.name() + ", 
source does not match";
+        throw error(MSG(txt));
+    }
+    if (credit)
+        pn_link_flow(link_.pn_link(), credit);
+    fetcher_.incref();
+}
+
+blocking_receiver::blocking_receiver(const blocking_receiver& r) : 
blocking_link(r), fetcher_(r.fetcher_) {
+    fetcher_.incref();
+}
+blocking_receiver& blocking_receiver::operator=(const blocking_receiver& r) {
+    if (this == &r) return *this;
+    fetcher_ = r.fetcher_;
+    fetcher_.incref();
+    return *this;
+}
+blocking_receiver::~blocking_receiver() { fetcher_.decref(); }
+
+
+
+message blocking_receiver::receive(duration timeout) {
+    receiver rcv = link_;
+    if (!rcv.credit())
+        rcv.flow(1);
+    std::string txt = "Receiving on receiver " + link_.name();
+    fetcher_has_message cond(fetcher_);
+    connection_.wait(cond, txt, timeout);
+    return fetcher_.pop();
+}
+
+message blocking_receiver::receive() {
+    // Use default timeout
+    return receive(connection_.timeout());
+}
+
+void blocking_receiver::accept() {
+    settle(delivery::ACCEPTED);
+}
+
+void blocking_receiver::reject() {
+    settle(delivery::REJECTED);
+}
+
+void blocking_receiver::release(bool delivered) {
+    if (delivered)
+        settle(delivery::MODIFIED);
+    else
+        settle(delivery::RELEASED);
+}
+
+void blocking_receiver::settle(delivery::state state = delivery::NONE) {
+    fetcher_.settle(state);
+}
+
+} // namespace

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/blocking_sender.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/blocking_sender.cpp 
b/proton-c/bindings/cpp/src/blocking_sender.cpp
index 2ab1ef1..8fea98a 100644
--- a/proton-c/bindings/cpp/src/blocking_sender.cpp
+++ b/proton-c/bindings/cpp/src/blocking_sender.cpp
@@ -44,7 +44,7 @@ blocking_sender::blocking_sender(blocking_connection &c, 
sender &l) : blocking_l
         wait_for_closed();
         link_.close();
         std::string txt = "Failed to open sender " + link_.name() + ", target 
does not match";
-        throw error(MSG("container not started"));
+        throw error(MSG(txt));
     }
 }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/connection_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_impl.cpp 
b/proton-c/bindings/cpp/src/connection_impl.cpp
index 51da569..e2f4608 100644
--- a/proton-c/bindings/cpp/src/connection_impl.cpp
+++ b/proton-c/bindings/cpp/src/connection_impl.cpp
@@ -103,6 +103,10 @@ connection &connection_impl::connection() {
 
 container &connection_impl::container() { return (container_); }
 
+
+// TODO: Rework this.  Rename and document excellently for user handlers on 
connections.
+// Better: provide general solution for handlers that delete before the C 
reactor object
+// has finished sending events.
 void connection_impl::reactor_detach() {
     // "save" goes out of scope last, preventing possible recursive destructor
     // confusion with reactor_reference.

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/container.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container.cpp 
b/proton-c/bindings/cpp/src/container.cpp
index 07cb971..a424c0b 100644
--- a/proton-c/bindings/cpp/src/container.cpp
+++ b/proton-c/bindings/cpp/src/container.cpp
@@ -72,8 +72,8 @@ sender container::create_sender(const proton::url &url) {
     return impl_->create_sender(url);
 }
 
-receiver container::create_receiver(connection &connection, const std::string 
&addr, handler *h) {
-    return impl_->create_receiver(connection, addr, h);
+receiver container::create_receiver(connection &connection, const std::string 
&addr, bool dynamic, handler *h) {
+    return impl_->create_receiver(connection, addr, dynamic, h);
 }
 
 receiver container::create_receiver(const proton::url &url) {
@@ -91,5 +91,6 @@ bool container::process() { return impl_->process(); }
 void container::stop() { impl_->stop(); }
 void container::wakeup() { impl_->wakeup(); }
 bool container::is_quiesced() { return impl_->is_quiesced(); }
+void container::yield() { impl_->yield(); }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp 
b/proton-c/bindings/cpp/src/container_impl.cpp
index 0858d73..e328251 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -148,6 +148,9 @@ void cpp_handler_dispatch(pn_handler_t *c_handler, 
pn_event_t *cevent, pn_event_
     container c(inbound_context::get(c_handler)->container_impl_);
     messaging_event mevent(cevent, type, c);
     mevent.dispatch(*inbound_context::get(c_handler)->cpp_handler_);
+    // Possible decref and deletion via a handler action on this event.
+    // return without further processing.
+    return;
 }
 
 void cpp_handler_cleanup(pn_handler_t *c_handler)
@@ -199,7 +202,7 @@ container_impl::~container_impl() {
 connection container_impl::connect(const proton::url &url, handler *h) {
     if (!reactor_) throw error(MSG("container not started"));
     container ctnr(this);
-    connection conn(ctnr, handler_);
+    connection conn(ctnr, h);
     connector *ctor = new connector(conn);
     // connector self-deletes depending on reconnect logic
     ctor->address(url);  // TODO: url vector
@@ -238,7 +241,9 @@ sender container_impl::create_sender(connection 
&connection, const std::string &
     pn_terminus_set_address(pn_link_target(lnk), addr.c_str());
     if (h) {
         pn_record_t *record = pn_link_attachments(lnk);
-        pn_record_set_handler(record, wrap_handler(h));
+        pn_handler_t *chandler = wrap_handler(h);
+        pn_record_set_handler(record, chandler);
+        pn_decref(chandler);
     }
     snd.open();
     return snd;
@@ -255,16 +260,21 @@ sender container_impl::create_sender(const proton::url 
&url) {
     return snd;
 }
 
-receiver container_impl::create_receiver(connection &connection, const 
std::string &addr, handler *h) {
+receiver container_impl::create_receiver(connection &connection, const 
std::string &addr, bool dynamic, handler *h) {
     if (!reactor_) throw error(MSG("container not started"));
     connection_impl *conn_impl = impl(connection);
     session session = default_session(conn_impl->pn_connection_, 
&conn_impl->default_session_);
     receiver rcv = session.create_receiver(container_id_ + '-' + addr);
     pn_link_t *lnk = rcv.pn_link();
-    pn_terminus_set_address(pn_link_source(lnk), addr.c_str());
+    pn_terminus_t *src = pn_link_source(lnk);
+    pn_terminus_set_address(src, addr.c_str());
+    if (dynamic)
+        pn_terminus_set_dynamic(src, true);
     if (h) {
         pn_record_t *record = pn_link_attachments(lnk);
-        pn_record_set_handler(record, wrap_handler(h));
+        pn_handler_t *chandler = wrap_handler(h);
+        pn_record_set_handler(record, chandler);
+        pn_decref(chandler);
     }
     rcv.open();
     return rcv;
@@ -360,4 +370,9 @@ bool container_impl::is_quiesced() {
     return pn_reactor_quiesced(reactor_);
 }
 
+void container_impl::yield() {
+    if (!reactor_) throw error(MSG("container not started"));
+    pn_reactor_yield(reactor_);
+}
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.hpp 
b/proton-c/bindings/cpp/src/container_impl.hpp
index 4378f7d..1ce27ee 100644
--- a/proton-c/bindings/cpp/src/container_impl.hpp
+++ b/proton-c/bindings/cpp/src/container_impl.hpp
@@ -48,7 +48,7 @@ class container_impl
     PN_CPP_EXTERN pn_reactor_t *reactor();
     PN_CPP_EXTERN sender create_sender(connection &connection, const 
std::string &addr, handler *h);
     PN_CPP_EXTERN sender create_sender(const url&);
-    PN_CPP_EXTERN receiver create_receiver(connection &connection, const 
std::string &addr, handler *h);
+    PN_CPP_EXTERN receiver create_receiver(connection &connection, const 
std::string &addr, bool dynamic, handler *h);
     PN_CPP_EXTERN receiver create_receiver(const url&);
     PN_CPP_EXTERN class acceptor listen(const url&);
     PN_CPP_EXTERN std::string container_id();
@@ -59,6 +59,7 @@ class container_impl
     void stop();
     void wakeup();
     bool is_quiesced();
+    void yield();
     pn_handler_t *wrap_handler(handler *h);
     static void incref(container_impl *);
     static void decref(container_impl *);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/delivery.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/delivery.cpp 
b/proton-c/bindings/cpp/src/delivery.cpp
index 7ff596f..6c87939 100644
--- a/proton-c/bindings/cpp/src/delivery.cpp
+++ b/proton-c/bindings/cpp/src/delivery.cpp
@@ -52,6 +52,10 @@ void delivery::settle() {
     pn_delivery_settle(impl_);
 }
 
+void delivery::update(delivery::state state) {
+    pn_delivery_update(impl_, state);
+}
+
 pn_delivery_t *delivery::pn_delivery() { return impl_; }
 
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/event.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/event.cpp 
b/proton-c/bindings/cpp/src/event.cpp
index e12d19c..2affed1 100644
--- a/proton-c/bindings/cpp/src/event.cpp
+++ b/proton-c/bindings/cpp/src/event.cpp
@@ -57,6 +57,10 @@ class link event::link() {
     throw error(MSG("No link context for event"));
 }
 
+class delivery event::delivery() {
+    throw error(MSG("No link context for event"));
+}
+
 class message event::message() {
     throw error(MSG("No message associated with event"));
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/fetcher.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/fetcher.cpp 
b/proton-c/bindings/cpp/src/fetcher.cpp
new file mode 100644
index 0000000..46ee18c
--- /dev/null
+++ b/proton-c/bindings/cpp/src/fetcher.cpp
@@ -0,0 +1,92 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "fetcher.hpp"
+#include "proton/event.hpp"
+
+namespace proton {
+
+fetcher::fetcher(blocking_connection &c, int prefetch) :
+    messaging_handler(prefetch, false), // no auto_accept
+    connection_(c), refcount_(0), pn_link_(0) {
+}
+
+void fetcher::incref() { refcount_++; }
+void fetcher::decref() {
+    refcount_--;
+    if (!refcount_) {
+        // fetcher needs to outlive its blocking_receiver unless disconnected 
from reactor
+        if (pn_link_) {
+            pn_record_set_handler(pn_link_attachments(pn_link_), 0);
+            pn_decref(pn_link_);
+        }
+        delete this;
+        return;
+    }
+}
+
+void fetcher::on_link_init(event &e) {
+    pn_link_ = e.link().pn_link();
+    pn_incref(pn_link_);
+}
+
+void fetcher::on_message(event &e) {
+    messages_.push_back(e.message());
+    deliveries_.push_back(e.delivery());
+    // Wake up enclosing blocking_connection.wait()
+    e.container().yield();
+}
+
+void fetcher::on_link_error(event &e) {
+    link lnk = e.link();
+    if (pn_link_state(lnk.pn_link()) & PN_LOCAL_ACTIVE) {
+        lnk.close();
+        throw error(MSG("Link detached: " << lnk.name()));
+    }
+}
+
+void fetcher::on_connection_error(event &e) {
+    throw error(MSG("Connection closed"));
+}
+
+bool fetcher::has_message() {
+    return !messages_.empty();
+}
+
+message fetcher::pop() {
+    if (messages_.empty())
+        throw error(MSG("blocking_receiver has no messages"));
+    delivery &dlv(deliveries_.front());
+    if (!dlv.settled())
+        unsettled_.push_back(dlv);
+    message m = messages_.front();
+    messages_.pop_front();
+    deliveries_.pop_front();
+    return m;
+}
+
+void fetcher::settle(delivery::state state) {
+    delivery &dlv = unsettled_.front();
+    if (state)
+        dlv.update(state);
+    dlv.settle();
+}
+
+} // namespace

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/fetcher.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/fetcher.hpp 
b/proton-c/bindings/cpp/src/fetcher.hpp
new file mode 100644
index 0000000..85200fe
--- /dev/null
+++ b/proton-c/bindings/cpp/src/fetcher.hpp
@@ -0,0 +1,58 @@
+#ifndef PROTON_CPP_FETCHER_H
+#define PROTON_CPP_FETCHER_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "proton/container.hpp"
+#include "proton/messaging_handler.hpp"
+#include "proton/blocking_connection.hpp"
+#include "proton/error.hpp"
+#include "msg.hpp"
+#include <string>
+#include <deque>
+
+namespace proton {
+
+class fetcher : public messaging_handler {
+  private:
+    blocking_connection connection_;
+    std::deque<message> messages_;
+    std::deque<delivery> deliveries_;
+    std::deque<delivery> unsettled_;
+    int refcount_;
+    pn_link_t *pn_link_;
+  public:
+    fetcher(blocking_connection &c, int p);
+    void incref();
+    void decref();
+    void on_message(event &e);
+    void on_link_error(event &e);
+    void on_connection_error(event &e);
+    void on_link_init(event &e);
+    bool has_message();
+    message pop();
+    void settle(delivery::state state = delivery::NONE);
+};
+
+
+}
+
+#endif  /*!PROTON_CPP_FETCHER_H*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/messaging_event.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/messaging_event.cpp 
b/proton-c/bindings/cpp/src/messaging_event.cpp
index d79974d..5062d76 100644
--- a/proton-c/bindings/cpp/src/messaging_event.cpp
+++ b/proton-c/bindings/cpp/src/messaging_event.cpp
@@ -82,6 +82,14 @@ link messaging_event::link() {
     throw error(MSG("No link context for event"));
 }
 
+delivery messaging_event::delivery() {
+    if (type_ == messaging_event::PROTON)
+        return proton_event::delivery();
+    if (parent_event_)
+        return parent_event_->delivery();
+    throw error(MSG("No delivery context for event"));
+}
+
 message messaging_event::message() {
     if (parent_event_) {
         pn_message_t *m = event_context(parent_event_->pn_event());

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/proton_event.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/proton_event.cpp 
b/proton-c/bindings/cpp/src/proton_event.cpp
index 71f2a08..46b43ee 100644
--- a/proton-c/bindings/cpp/src/proton_event.cpp
+++ b/proton-c/bindings/cpp/src/proton_event.cpp
@@ -78,6 +78,13 @@ link proton_event::link() {
     throw error(MSG("No link context for this event"));
 }
 
+delivery proton_event::delivery() {
+    pn_delivery_t *dlv = pn_event_delivery(pn_event());
+    if (dlv)
+        return proton::delivery(dlv);
+    throw error(MSG("No delivery context for this event"));
+}
+
 
 
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/4339b1c7/proton-c/bindings/cpp/src/receiver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/receiver.cpp 
b/proton-c/bindings/cpp/src/receiver.cpp
index 84412e6..b9ed680 100644
--- a/proton-c/bindings/cpp/src/receiver.cpp
+++ b/proton-c/bindings/cpp/src/receiver.cpp
@@ -40,5 +40,8 @@ void receiver::verify_type(pn_link_t *lnk) {
         throw error(MSG("Creating receiver with sender context"));
 }
 
+void receiver::flow(int count) {
+    pn_link_flow(pn_link(), count);
+}
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to