PROTON-1536: [C++ Binding] Add tick() member function to connection_driver API
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0ee21553 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0ee21553 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0ee21553 Branch: refs/heads/go1 Commit: 0ee215539b5a3299132233c98447797806e5ff4a Parents: 31c16db Author: Andrew Stitcher <[email protected]> Authored: Thu Aug 10 10:44:06 2017 -0400 Committer: Andrew Stitcher <[email protected]> Committed: Fri Aug 11 13:50:51 2017 -0400 ---------------------------------------------------------------------- .../cpp/include/proton/io/connection_driver.hpp | 21 ++-- .../bindings/cpp/include/proton/timestamp.hpp | 1 + .../bindings/cpp/src/connection_driver_test.cpp | 106 +++++++++++++++++-- .../bindings/cpp/src/io/connection_driver.cpp | 6 ++ 4 files changed, 117 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ee21553/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp index 5df210d..44275bc 100644 --- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp +++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp @@ -22,20 +22,14 @@ * */ -#include "../internal/config.hpp" -#include "../connection.hpp" #include "../connection_options.hpp" -#include "../error.hpp" #include "../error_condition.hpp" -#include "../internal/export.hpp" -#include "../internal/pn_unique_ptr.hpp" -#include "../transport.hpp" -#include "../types.hpp" +#include "../fwd.hpp" +#include "../internal/config.hpp" +#include "../types_fwd.hpp" #include <proton/connection_driver.h> -#include <cstddef> -#include <utility> #include <string> namespace proton { @@ -145,6 +139,15 @@ PN_CPP_CLASS_EXTERN connection_driver { /// Note that there may still be events to dispatch() or data to read. PN_CPP_EXTERN void write_close(); + /// Indicate that time has passed + /// + /// @return the expiration time of the next unexpired timer. You must arrange to call tick() + /// no later than this expiration time. In practice this will mean calling tick() every time + /// there is anything read or written, and if nothing is read or written then as soon as possible + /// after the returned timestamp (so you will probably need to set a platform specific timeout to + /// know when this occurs). + PN_CPP_EXTERN timestamp tick(timestamp now); + /// Inform the engine that the transport been disconnected unexpectedly, /// without completing the AMQP connection close sequence. /// http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ee21553/proton-c/bindings/cpp/include/proton/timestamp.hpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/include/proton/timestamp.hpp b/proton-c/bindings/cpp/include/proton/timestamp.hpp index 6e4281b..ea578bb 100644 --- a/proton-c/bindings/cpp/include/proton/timestamp.hpp +++ b/proton-c/bindings/cpp/include/proton/timestamp.hpp @@ -56,6 +56,7 @@ inline bool operator==(timestamp x, timestamp y) { return x.milliseconds() == y. inline bool operator<(timestamp x, timestamp y) { return x.milliseconds() < y.milliseconds(); } inline timestamp operator+(timestamp ts, duration d) { return timestamp(ts.milliseconds() + d.milliseconds()); } +inline timestamp operator-(timestamp ts, duration d) { return timestamp(ts.milliseconds() - d.milliseconds()); } inline duration operator-(timestamp t0, timestamp t1) { return duration(t0.milliseconds() - t1.milliseconds()); } inline timestamp operator+(duration d, timestamp ts) { return ts + d; } /// @} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ee21553/proton-c/bindings/cpp/src/connection_driver_test.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/connection_driver_test.cpp b/proton-c/bindings/cpp/src/connection_driver_test.cpp index db5bc90..7fcde46 100644 --- a/proton-c/bindings/cpp/src/connection_driver_test.cpp +++ b/proton-c/bindings/cpp/src/connection_driver_test.cpp @@ -32,6 +32,7 @@ #include "proton/sender_options.hpp" #include "proton/source_options.hpp" #include "proton/thread_safe.hpp" +#include "proton/transport.hpp" #include "proton/types_fwd.hpp" #include "proton/uuid.hpp" @@ -87,14 +88,17 @@ struct in_memory_driver : public connection_driver { throw test::error("no activity, interrupting test"); } - void process() { + timestamp process(timestamp t = timestamp()) { check_idle(); if (!dispatch()) throw test::error("unexpected close: "+connection().error().what()); + timestamp next_tick; + if (t!=timestamp()) next_tick = tick(t); do_read(); do_write(); check_idle(); dispatch(); + return next_tick; } }; @@ -115,12 +119,43 @@ struct driver_pair { void process() { a.process(); b.process(); } }; -template <class S> typename S::value_type quick_pop(S& s) { - ASSERT(!s.empty()); - typename S::value_type x = s.front(); - s.pop_front(); - return x; -} +/// A pair of drivers that talk to each other in-memory, simulating a connection. +/// This version also simulates the passage of time +struct timed_driver_pair { + duration timeout; + byte_stream ab, ba; + in_memory_driver a, b; + timestamp now; + + timed_driver_pair(duration t, const connection_options& oa0, const connection_options& ob0, + const std::string& name="" + ) : + timeout(t), + a(ba, ab, name+"a"), b(ab, ba, name+"b"), + now(100100100) + { + connection_options oa(oa0); + connection_options ob(ob0); + a.connect(oa.idle_timeout(t)); + b.accept(ob.idle_timeout(t)); + } + + void process_untimed() { a.process(); b.process(); } + void process_timed_succeed() { + timestamp anow = now + timeout - duration(100); + timestamp bnow = now + timeout - duration(100); + a.process(anow); + b.process(bnow); + now = std::max(anow, bnow); + } + void process_timed_fail() { + timestamp anow = now + timeout + timeout + duration(100); + timestamp bnow = now + timeout + timeout + duration(100); + a.process(anow); + b.process(bnow); + now = std::max(anow, bnow); + } +}; /// A handler that records incoming endpoints, errors etc. struct record_handler : public messaging_handler { @@ -162,6 +197,13 @@ struct record_handler : public messaging_handler { } }; +template <class S> typename S::value_type quick_pop(S& s) { + ASSERT(!s.empty()); + typename S::value_type x = s.front(); + s.pop_front(); + return x; +} + struct namer : public io::link_namer { char name; namer(char c) : name(c) {} @@ -340,6 +382,52 @@ void test_message() { ASSERT_EQUAL(value("b"), m2.message_annotations().get("a")); } +void test_message_timeout_succeed() { + // Verify a message arrives intact + record_handler ha, hb; + timed_driver_pair d(duration(2000), ha, hb); + + proton::sender s = d.a.connection().open_sender("x"); + d.process_timed_succeed(); + proton::message m("barefoot_timed_succeed"); + m.properties().put("x", "y"); + m.message_annotations().put("a", "b"); + s.send(m); + + while (hb.messages.size() == 0) + d.process_timed_succeed(); + + proton::message m2 = quick_pop(hb.messages); + ASSERT_EQUAL(value("barefoot_timed_succeed"), m2.body()); + ASSERT_EQUAL(value("y"), m2.properties().get("x")); + ASSERT_EQUAL(value("b"), m2.message_annotations().get("a")); +} + +void test_message_timeout_fail() { + // Verify a message arrives intact + record_handler ha, hb; + timed_driver_pair d(duration(2000), ha, hb); + + proton::sender s = d.a.connection().open_sender("x"); + d.process_timed_fail(); + proton::message m("barefoot_timed_fail"); + m.properties().put("x", "y"); + m.message_annotations().put("a", "b"); + s.send(m); + + d.process_timed_fail(); + + ASSERT_THROWS(test::error, + while (hb.messages.size() == 0) { + d.process_timed_fail(); + } + ); + + ASSERT_EQUAL(1u, hb.transport_errors.size()); + ASSERT_EQUAL("amqp:resource-limit-exceeded: local-idle-timeout expired", d.b.transport().error().what()); + ASSERT_EQUAL(1u, ha.connection_errors.size()); + ASSERT_EQUAL("amqp:resource-limit-exceeded: local-idle-timeout expired", d.a.connection().error().what()); +} } int main(int argc, char** argv) { @@ -349,7 +437,9 @@ int main(int argc, char** argv) { RUN_ARGV_TEST(failed, test_driver_disconnected()); RUN_ARGV_TEST(failed, test_no_container()); RUN_ARGV_TEST(failed, test_spin_interrupt()); - RUN_ARGV_TEST(failed, test_message()); RUN_ARGV_TEST(failed, test_link_filters()); + RUN_ARGV_TEST(failed, test_message()); + RUN_ARGV_TEST(failed, test_message_timeout_succeed()); + RUN_ARGV_TEST(failed, test_message_timeout_fail()); return failed; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ee21553/proton-c/bindings/cpp/src/io/connection_driver.cpp ---------------------------------------------------------------------- diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp b/proton-c/bindings/cpp/src/io/connection_driver.cpp index d907e5c..cc83f51 100644 --- a/proton-c/bindings/cpp/src/io/connection_driver.cpp +++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp @@ -19,9 +19,11 @@ #include "proton/io/connection_driver.hpp" +#include "proton/connection.hpp" #include "proton/container.hpp" #include "proton/error.hpp" #include "proton/messaging_handler.hpp" +#include "proton/transport.hpp" #include "proton/uuid.hpp" #include "proton/work_queue.hpp" @@ -128,6 +130,10 @@ void connection_driver::write_close() { pn_connection_driver_write_close(&driver_); } +timestamp connection_driver::tick(timestamp now) { + return timestamp(pn_transport_tick(driver_.transport, now.milliseconds())); +} + void connection_driver::disconnected(const proton::error_condition& err) { pn_condition_t* condition = pn_transport_condition(driver_.transport); if (!pn_condition_is_set(condition)) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
