Repository: qpid-proton Updated Branches: refs/heads/master 1e9e243a3 -> 200a4e221
PROTON-1194: C++ binding basic flow control Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/200a4e22 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/200a4e22 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/200a4e22 Branch: refs/heads/master Commit: 200a4e2217b9e2fdaea46d28577120c8c4633930 Parents: 1e9e243 Author: Clifford Jansen <[email protected]> Authored: Thu May 12 00:23:03 2016 -0700 Committer: Clifford Jansen <[email protected]> Committed: Thu May 12 00:23:03 2016 -0700 ---------------------------------------------------------------------- examples/cpp/CMakeLists.txt | 1 + examples/cpp/example_test.py | 8 + examples/cpp/flow_control.cpp | 229 +++++++++++++++++++ examples/cpp/queue_browser.cpp | 4 - .../bindings/cpp/include/proton/handler.hpp | 5 + proton-c/bindings/cpp/include/proton/link.hpp | 19 +- .../bindings/cpp/include/proton/receiver.hpp | 15 ++ proton-c/bindings/cpp/include/proton/sender.hpp | 6 + proton-c/bindings/cpp/src/contexts.hpp | 7 +- proton-c/bindings/cpp/src/handler.cpp | 2 + .../bindings/cpp/src/io/connection_engine.cpp | 1 + proton-c/bindings/cpp/src/link.cpp | 17 +- proton-c/bindings/cpp/src/messaging_adapter.cpp | 49 +++- proton-c/bindings/cpp/src/receiver.cpp | 32 +++ proton-c/bindings/cpp/src/receiver_options.cpp | 2 +- proton-c/bindings/cpp/src/sender.cpp | 9 + proton-c/bindings/cpp/src/sender_options.cpp | 2 +- 17 files changed, 376 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt index 2cb258f..a9f8700 100644 --- a/examples/cpp/CMakeLists.txt +++ b/examples/cpp/CMakeLists.txt @@ -38,6 +38,7 @@ foreach(example connection_options queue_browser selected_recv + flow_control ssl ssl_client_cert encode_decode) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/example_test.py ---------------------------------------------------------------------- diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py index 274efcf..7d4dc78 100644 --- a/examples/cpp/example_test.py +++ b/examples/cpp/example_test.py @@ -258,6 +258,14 @@ class ContainerExampleTest(BrokerTestCase): self.assertEqual(CLIENT_EXPECT, self.proc(["client", "-a", addr+"/examples"]).wait_exit()) + def test_flow_control(self): + want="""success: Example 1: simple credit +success: Example 2: basic drain +success: Example 3: drain without credit +success: Exmaple 4: high/low watermark +""" + self.assertEqual(want, self.proc(["flow_control", pick_addr(), "-quiet"]).wait_exit()) + def test_encode_decode(self): want=""" == Array, list and map of uniform type. http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/flow_control.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/flow_control.cpp b/examples/cpp/flow_control.cpp new file mode 100644 index 0000000..59d9a46 --- /dev/null +++ b/examples/cpp/flow_control.cpp @@ -0,0 +1,229 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "proton/acceptor.hpp" +#include "proton/connection.hpp" +#include "proton/container.hpp" +#include "proton/handler.hpp" +#include "proton/sender.hpp" +#include "proton/tracker.hpp" +#include "proton/delivery.hpp" + +#include <iostream> +#include <sstream> + +#include "fake_cpp11.hpp" + +bool verbose = true; + +void verify(bool success, const std::string &msg) { + if (!success) + throw std::runtime_error("example failure:" + msg); + else { + std::cout << "success: " << msg << std::endl; + if (verbose) std::cout << std::endl; + } +} + + +// flow_sender manages the incoming connection and acts as the message sender. +class flow_sender : public proton::handler { + private: + int available; // Number of messages the sender may send assuming sufficient credit. + int sequence; + + public: + flow_sender() : available(0), sequence(0) {} + + void on_sendable(proton::sender &s) override { + if (verbose) + std::cout << "flow_sender in \"on_sendable\" with credit " << s.credit() + << " and " << available << " available messages" << std::endl; + for (int i = sequence; available && s.credit() > 0; i++) { + std::ostringstream mbody; + mbody << "flow_sender message " << sequence++; + proton::message m(mbody.str()); + s.send(m); + available--; + } + } + + void on_sender_drain_start(proton::sender &s) override { + if (verbose) + std::cout << "flow_sender in \"on_drain_start\" with credit " << s.credit() + << " making an internal call to \"on_sendble\"" << std::endl; + on_sendable(s); // send as many as we can + if (s.credit()) { + s.return_credit(); // return the rest + } + } + + void set_available(int n) { available = n; } +}; + +class flow_receiver : public proton::handler { + public: + int stage; + int received; + flow_sender &sender; + + flow_receiver(flow_sender &s) : stage(0), sender(s) {} + + void example_setup(int n) { + received = 0; + sender.set_available(n); + } + + void run_stage(proton::receiver &r, const std::string &caller) { + // Serialize the progression of the flow control examples. + switch (stage) { + case 0: + if (verbose) std::cout << "Example 1. Simple use of credit." << std::endl; + // TODO: add timeout callbacks, show no messages until credit. + example_setup(2); + r.add_credit(2); + break; + case 1: + if (r.credit() > 0) return; + verify(received == 2, "Example 1: simple credit"); + + if (verbose) std::cout << "Example 2. Use basic drain, sender has 3 \"immediate\" messages." << std::endl; + example_setup(3); + r.add_credit(5); // ask for up to 5 + r.drain(); // but only use what's available + break; + case 2: + if (caller == "on_message") return; + if (caller == "on_receiver_drain_finish") { + // Note that unused credit of 2 at sender is returned and is now 0. + verify(received == 3 && r.credit() == 0, "Example 2: basic drain"); + + if (verbose) std::cout << "Example 3. Drain use with no credit." << std::endl; + example_setup(0); + r.drain(); + break; + } + verify(false, "example 2 run_stage"); + return; + + case 3: + verify(caller == "on_receiver_drain_finish" && received == 0, "Example 3: drain without credit"); + + if (verbose) std::cout << "Example 4. Show using high(10)/low(3) watermark for 25 messages." << std::endl; + example_setup(25); + r.add_credit(10); + break; + + case 4: + if (received < 25) { + // Top up credit as needed. + uint32_t credit = r.credit(); + if (credit <= 3) { + uint32_t new_credit = 10; + uint32_t remaining = 25 - received; + if (new_credit > remaining) + new_credit = remaining; + if (new_credit > credit) { + r.add_credit(new_credit - credit); + if (verbose) + std::cout << "flow_receiver adding credit for " << new_credit - credit + << " messages" << std::endl; + } + } + return; + } + + verify(received == 25 && r.credit() == 0, "Exmaple 4: high/low watermark"); + r.connection().close(); + break; + + default: + throw std::runtime_error("run_stage sequencing error"); + } + stage++; + } + + void on_receiver_open(proton::receiver &r) override { + run_stage(r, "on_receiver_open"); + } + + void on_message(proton::delivery &d, proton::message &m) override { + if (verbose) + std::cout << "flow_receiver in \"on_message\" with " << m.body() << std::endl; + proton::receiver r(d.receiver()); + received++; + run_stage(r, "on_message"); + } + + void on_receiver_drain_finish(proton::receiver &r) override { + if (verbose) + std::cout << "flow_receiver in \"on_receiver_drain_finish\"" << std::endl; + run_stage(r, "on_receiver_drain_finish"); + } +}; + + +class flow_control : public proton::handler { + private: + std::string url; + proton::acceptor acceptor; + flow_sender send_handler; + flow_receiver receive_handler; + + public: + flow_control(const std::string& u) : url(u), receive_handler(send_handler) {} + + void on_container_start(proton::container &c) override { + acceptor = c.listen(url, proton::connection_options().handler(&send_handler)); + c.connect(url); + } + + void on_connection_open(proton::connection &c) override { + if (c.active()) { + // outbound connection + c.open_receiver("flow_example", proton::receiver_options().handler(&receive_handler).credit_window(0)); + } + } + + void on_connection_close(proton::connection &) override { + acceptor.close(); + } +}; + +int main(int argc, char **argv) { + std::string quiet_arg("-quiet"); + if (argc > 2 && quiet_arg == argv[2]) + verbose = false; + try { + // Pick an "unusual" port since we are going to be talking to + // ourselves, not a broker. + std::string url = argc > 1 ? argv[1] : "127.0.0.1:8888/examples"; + + flow_control fc(url); + proton::container(fc).run(); + + return 0; + } catch (const std::exception& e) { + std::cerr << e.what() << std::endl; + } + + return 1; +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/examples/cpp/queue_browser.cpp ---------------------------------------------------------------------- diff --git a/examples/cpp/queue_browser.cpp b/examples/cpp/queue_browser.cpp index a1fa471..87cb147 100644 --- a/examples/cpp/queue_browser.cpp +++ b/examples/cpp/queue_browser.cpp @@ -48,10 +48,6 @@ class browser : public proton::handler { void on_message(proton::delivery &d, proton::message &m) override { std::cout << m.body() << std::endl; - - if (d.receiver().queued() == 0 && d.receiver().drained() > 0) { - d.connection().close(); - } } }; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/handler.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/handler.hpp b/proton-c/bindings/cpp/include/proton/handler.hpp index b59e0cf..3cc2759 100644 --- a/proton-c/bindings/cpp/include/proton/handler.hpp +++ b/proton-c/bindings/cpp/include/proton/handler.hpp @@ -141,6 +141,11 @@ PN_CPP_CLASS_EXTERN handler /// The sending peer settled a transfer. PN_CPP_EXTERN virtual void on_delivery_settle(delivery &d); + /// The receiving peer has requested a drain of remaining credit. + PN_CPP_EXTERN virtual void on_sender_drain_start(sender &s); + /// The credit outstanding at the time of the call to receiver::drain has been consumed or returned. + PN_CPP_EXTERN virtual void on_receiver_drain_finish(receiver &r); + /// Fallback error handling. PN_CPP_EXTERN virtual void on_error(const error_condition &c); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/link.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/link.hpp b/proton-c/bindings/cpp/include/proton/link.hpp index f57754c..1a5347c 100644 --- a/proton-c/bindings/cpp/include/proton/link.hpp +++ b/proton-c/bindings/cpp/include/proton/link.hpp @@ -86,20 +86,11 @@ PN_CPP_CLASS_EXTERN link : public internal::object<pn_link_t> , public endpoint /// Credit available on the link. PN_CPP_EXTERN int credit() const; - /// The number of deliveries queued on the link. - PN_CPP_EXTERN int queued(); - - /// @cond INTERNAL - /// XXX ask about when this is used - /// The number of unsettled deliveries on the link. - PN_CPP_EXTERN int unsettled(); - /// @endcond - - /// @cond INTERNAL - /// XXX revisit mind-melting API inherited from C - /// XXX flush() ? drain, and drain_completed (sender and receiver ends) - PN_CPP_EXTERN int drained(); - /// @endcond + /// True for a receiver if a drain cycle has been started and the + /// corresponding on_receiver_drain_finish event is still pending. + /// @see receiver::drain. True for a sender if the receiver has + /// requested a drain of credit and the sender has unused credit. + PN_CPP_EXTERN bool draining(); /// Get the link name. PN_CPP_EXTERN std::string name() const; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/receiver.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/receiver.hpp b/proton-c/bindings/cpp/include/proton/receiver.hpp index 0bdff5d..af2ab1e 100644 --- a/proton-c/bindings/cpp/include/proton/receiver.hpp +++ b/proton-c/bindings/cpp/include/proton/receiver.hpp @@ -52,6 +52,21 @@ PN_CPP_CLASS_EXTERN receiver : public link { /// Get the target node. PN_CPP_EXTERN class target target() const; + /// Increment the credit available to the sender. Credit granted + /// during a drain cycle is not communicated to the receiver until + /// the drain completes. + PN_CPP_EXTERN void add_credit(uint32_t); + + /// Commence a drain cycle. If there is positive credit, a + /// request is sent to the sender to immediately use up all of the + /// existing credit balance by sending messages that are + /// immediately available and releasing any unused credit (see + /// sender::return_credit). Throws proton::error if a drain cycle + /// is already in progress. An on_receiver_drain_finish event + /// will be generated when the outstanding drained credit reaches + /// zero. + PN_CPP_EXTERN void drain(); + /// @cond INTERNAL friend class internal::factory<receiver>; friend class receiver_iterator; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/include/proton/sender.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/sender.hpp b/proton-c/bindings/cpp/include/proton/sender.hpp index 452809e..fd63ff4 100644 --- a/proton-c/bindings/cpp/include/proton/sender.hpp +++ b/proton-c/bindings/cpp/include/proton/sender.hpp @@ -58,6 +58,12 @@ PN_CPP_CLASS_EXTERN sender : public link /// Get the target node. PN_CPP_EXTERN class target target() const; + /// Return all unused credit to the receiver in response to a + /// drain request. Has no effect unless there has been a drain + /// request and there is remaining credit to use or return. + /// @see receiver::drain. + PN_CPP_EXTERN void return_credit(); + /// @cond INTERNAL friend class internal::factory<sender>; friend class sender_iterator; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/contexts.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/contexts.hpp b/proton-c/bindings/cpp/src/contexts.hpp index 0305b8a..9a4a9fe 100644 --- a/proton-c/bindings/cpp/src/contexts.hpp +++ b/proton-c/bindings/cpp/src/contexts.hpp @@ -83,13 +83,14 @@ class context { // Connection context used by all connections. class connection_context : public context { public: - connection_context() : default_session(0), work_queue(0) {} + connection_context() : default_session(0), work_queue(0), collector(0) {} // Used by all connections pn_session_t *default_session; // Owned by connection. message event_message; // re-used by messaging_adapter for performance. id_generator link_gen; // Link name generator. class work_queue* work_queue; // Work queue if this is proton::controller connection. + pn_collector_t* collector; internal::pn_unique_ptr<proton_handler> handler; @@ -120,10 +121,12 @@ class listener_context : public context { class link_context : public context { public: static link_context& get(pn_link_t* l); - link_context() : credit_window(10), auto_accept(true), auto_settle(true) {} + link_context() : credit_window(10), auto_accept(true), auto_settle(true), draining(false), pending_credit(0) {} int credit_window; bool auto_accept; bool auto_settle; + bool draining; + uint32_t pending_credit; }; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/handler.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/handler.cpp b/proton-c/bindings/cpp/src/handler.cpp index 5bb3859..d1d1745 100644 --- a/proton-c/bindings/cpp/src/handler.cpp +++ b/proton-c/bindings/cpp/src/handler.cpp @@ -58,6 +58,8 @@ void handler::on_tracker_reject(tracker &) {} void handler::on_tracker_release(tracker &) {} void handler::on_tracker_settle(tracker &) {} void handler::on_delivery_settle(delivery &) {} +void handler::on_sender_drain_start(sender &) {} +void handler::on_receiver_drain_finish(receiver &) {} void handler::on_error(const error_condition& c) { throw proton::error(c.what()); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/io/connection_engine.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/connection_engine.cpp b/proton-c/bindings/cpp/src/io/connection_engine.cpp index 045ae66..3865953 100644 --- a/proton-c/bindings/cpp/src/io/connection_engine.cpp +++ b/proton-c/bindings/cpp/src/io/connection_engine.cpp @@ -58,6 +58,7 @@ connection_engine::connection_engine(class handler &h, const connection_options& if (connection_.container_id().empty()) pn_connection_set_container(unwrap(connection_), uuid::random().str().c_str()); id_generator &link_gen = connection_context::get(connection_).link_gen; + connection_context::get(connection_).collector = collector_.get(); if (link_gen.prefix().empty()) link_gen.prefix(uuid::random().str()+"/"); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/link.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/link.cpp b/proton-c/bindings/cpp/src/link.cpp index 725b3b3..9b4f325 100644 --- a/proton-c/bindings/cpp/src/link.cpp +++ b/proton-c/bindings/cpp/src/link.cpp @@ -49,12 +49,21 @@ void link::detach() { } int link::credit() const { - return pn_link_credit(pn_object()); + pn_link_t *lnk = pn_object(); + if (pn_link_is_sender(lnk)) + return pn_link_credit(lnk); + link_context& lctx = link_context::get(lnk); + return pn_link_credit(lnk) + lctx.pending_credit; } -int link::queued() { return pn_link_queued(pn_object()); } -int link::unsettled() { return pn_link_unsettled(pn_object()); } -int link::drained() { return pn_link_drained(pn_object()); } +bool link::draining() { + pn_link_t *lnk = pn_object(); + link_context& lctx = link_context::get(lnk); + if (pn_link_is_sender(lnk)) + return pn_link_credit(lnk) > 0 && lctx.draining; + else + return lctx.draining; +} std::string link::name() const { return str(pn_link_name(pn_object()));} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/messaging_adapter.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/messaging_adapter.cpp b/proton-c/bindings/cpp/src/messaging_adapter.cpp index 8ac377e..1fe9bcd 100644 --- a/proton-c/bindings/cpp/src/messaging_adapter.cpp +++ b/proton-c/bindings/cpp/src/messaging_adapter.cpp @@ -68,13 +68,35 @@ void messaging_adapter::on_reactor_init(proton_event &pe) { void messaging_adapter::on_link_flow(proton_event &pe) { pn_event_t *pne = pe.pn_event(); pn_link_t *lnk = pn_event_link(pne); - sender s(make_wrapper<sender>(lnk)); + // TODO: process session flow data, if no link-specific data, just return. + if (!lnk) return; + link_context& lctx = link_context::get(lnk); int state = pn_link_state(lnk); - if (lnk && pn_link_is_sender(lnk) && pn_link_credit(lnk) > 0 && - (state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) - { - // create on_message extended event - delegate_.on_sendable(s); + if ((state&PN_LOCAL_ACTIVE) && (state&PN_REMOTE_ACTIVE)) { + if (pn_link_is_sender(lnk)) { + if (pn_link_credit(lnk) > 0) { + sender s(make_wrapper<sender>(lnk)); + if (pn_link_draining(lnk)) { + if (!lctx.draining) { + lctx.draining = true; + delegate_.on_sender_drain_start(s); + } + } + else { + lctx.draining = false; + } + // create on_message extended event + delegate_.on_sendable(s); + } + } + else { + // receiver + if (!pn_link_credit(lnk) && lctx.draining) { + lctx.draining = false; + receiver r(make_wrapper<receiver>(lnk)); + delegate_.on_receiver_drain_finish(r); + } + } } credit_topup(lnk); } @@ -103,11 +125,26 @@ void messaging_adapter::on_delivery(proton_event &pe) { delegate_.on_message(d, msg); if (lctx.auto_accept && !d.settled()) d.accept(); + if (lctx.draining && !pn_link_credit(lnk)) { + lctx.draining = false; + receiver r(make_wrapper<receiver>(lnk)); + delegate_.on_receiver_drain_finish(r); + } } } else if (pn_delivery_updated(dlv) && d.settled()) { delegate_.on_delivery_settle(d); } + if (lctx.draining && pn_link_credit(lnk) == 0) { + lctx.draining = false; + pn_link_set_drain(lnk, false); + receiver r(make_wrapper<receiver>(lnk)); + delegate_.on_receiver_drain_finish(r); + if (lctx.pending_credit) { + pn_link_flow(lnk, lctx.pending_credit); + lctx.pending_credit = 0; + } + } credit_topup(lnk); } else { tracker t(make_wrapper<tracker>(dlv)); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/receiver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver.cpp b/proton-c/bindings/cpp/src/receiver.cpp index 4760c71..eb1c06e 100644 --- a/proton-c/bindings/cpp/src/receiver.cpp +++ b/proton-c/bindings/cpp/src/receiver.cpp @@ -24,10 +24,13 @@ #include "msg.hpp" #include "proton_bits.hpp" +#include "contexts.hpp" #include "proton/connection.h" #include "proton/session.h" #include "proton/link.h" +#include "proton/event.h" +#include "proton/reactor.h" namespace proton { @@ -46,6 +49,35 @@ class target receiver::target() const { return proton::target(*this); } +void receiver::add_credit(uint32_t credit) { + link_context &ctx = link_context::get(pn_object()); + if (ctx.draining) + ctx.pending_credit += credit; + else + pn_link_flow(pn_object(), credit); +} + +void receiver::drain() { + link_context &ctx = link_context::get(pn_object()); + if (ctx.draining) + throw proton::error("drain already in progress"); + else { + ctx.draining = true; + if (credit() > 0) + pn_link_set_drain(pn_object(), true); + else { + // Drain is already complete. No state to communicate over the wire. + // Create dummy flow event where "drain finish" can be detected. + pn_connection_t *pnc = pn_session_connection(pn_link_session(pn_object())); + connection_context& cctx = connection_context::get(pnc); + // connection_engine collector is per connection. Reactor collector is global. + pn_collector_t *coll = cctx.collector; + if (!coll) + coll = pn_reactor_collector(pn_object_reactor(pnc)); + pn_collector_put(coll, PN_OBJECT, pn_object(), PN_LINK_FLOW); + } + } +} receiver_iterator receiver_iterator::operator++() { if (!!obj_) { http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/receiver_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/receiver_options.cpp b/proton-c/bindings/cpp/src/receiver_options.cpp index f39ea66..1a1b3f3 100644 --- a/proton-c/bindings/cpp/src/receiver_options.cpp +++ b/proton-c/bindings/cpp/src/receiver_options.cpp @@ -45,7 +45,7 @@ template <class T> struct option { class receiver_options::impl { static void set_handler(receiver l, proton_handler &h) { pn_record_t *record = pn_link_attachments(unwrap(l)); - internal::pn_ptr<pn_handler_t> chandler = connection().container().impl_->cpp_handler(&h); + internal::pn_ptr<pn_handler_t> chandler = l.connection().container().impl_->cpp_handler(&h); pn_record_set_handler(record, chandler.get()); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/sender.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sender.cpp b/proton-c/bindings/cpp/src/sender.cpp index fc33524..e45cf7a 100644 --- a/proton-c/bindings/cpp/src/sender.cpp +++ b/proton-c/bindings/cpp/src/sender.cpp @@ -28,6 +28,7 @@ #include "proton/types.h" #include "proton_bits.hpp" +#include "contexts.hpp" namespace proton { @@ -61,9 +62,17 @@ tracker sender::send(const message &message) { pn_link_advance(pn_object()); if (pn_link_snd_settle_mode(pn_object()) == PN_SND_SETTLED) pn_delivery_settle(dlv); + if (!pn_link_credit(pn_object())) + link_context::get(pn_object()).draining = false; return make_wrapper<tracker>(dlv); } +void sender::return_credit() { + link_context &lctx = link_context::get(pn_object()); + lctx.draining = false; + pn_link_drained(pn_object()); +} + sender_iterator sender_iterator::operator++() { if (!!obj_) { pn_link_t *lnk = pn_link_next(obj_.pn_object(), 0); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/200a4e22/proton-c/bindings/cpp/src/sender_options.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/sender_options.cpp b/proton-c/bindings/cpp/src/sender_options.cpp index ed030d9..4786937 100644 --- a/proton-c/bindings/cpp/src/sender_options.cpp +++ b/proton-c/bindings/cpp/src/sender_options.cpp @@ -43,7 +43,7 @@ template <class T> struct option { class sender_options::impl { static void set_handler(sender l, proton_handler &h) { pn_record_t *record = pn_link_attachments(unwrap(l)); - internal::pn_ptr<pn_handler_t> chandler = connection().container().impl_->cpp_handler(&h); + internal::pn_ptr<pn_handler_t> chandler = l.connection().container().impl_->cpp_handler(&h); pn_record_set_handler(record, chandler.get()); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
