Repository: qpid-proton
Updated Branches:
  refs/heads/master 31c16db12 -> 0ee215539


PROTON-1536: [C++ Binding] Add tick() member function to connection_driver API


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/0ee21553
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/0ee21553
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/0ee21553

Branch: refs/heads/master
Commit: 0ee215539b5a3299132233c98447797806e5ff4a
Parents: 31c16db
Author: Andrew Stitcher <astitc...@apache.org>
Authored: Thu Aug 10 10:44:06 2017 -0400
Committer: Andrew Stitcher <astitc...@apache.org>
Committed: Fri Aug 11 13:50:51 2017 -0400

----------------------------------------------------------------------
 .../cpp/include/proton/io/connection_driver.hpp |  21 ++--
 .../bindings/cpp/include/proton/timestamp.hpp   |   1 +
 .../bindings/cpp/src/connection_driver_test.cpp | 106 +++++++++++++++++--
 .../bindings/cpp/src/io/connection_driver.cpp   |   6 ++
 4 files changed, 117 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ee21553/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp 
b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
index 5df210d..44275bc 100644
--- a/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
+++ b/proton-c/bindings/cpp/include/proton/io/connection_driver.hpp
@@ -22,20 +22,14 @@
  *
  */
 
-#include "../internal/config.hpp"
-#include "../connection.hpp"
 #include "../connection_options.hpp"
-#include "../error.hpp"
 #include "../error_condition.hpp"
-#include "../internal/export.hpp"
-#include "../internal/pn_unique_ptr.hpp"
-#include "../transport.hpp"
-#include "../types.hpp"
+#include "../fwd.hpp"
+#include "../internal/config.hpp"
+#include "../types_fwd.hpp"
 
 #include <proton/connection_driver.h>
 
-#include <cstddef>
-#include <utility>
 #include <string>
 
 namespace proton {
@@ -145,6 +139,15 @@ PN_CPP_CLASS_EXTERN connection_driver {
     /// Note that there may still be events to dispatch() or data to read.
     PN_CPP_EXTERN void write_close();
 
+    /// Indicate that time has passed
+    ///
+    /// @return the expiration time of the next unexpired timer. You must 
arrange to call tick()
+    /// no later than this expiration time. In practice this will mean calling 
tick() every time
+    /// there is anything read or written, and if nothing is read or written 
then as soon as possible
+    /// after the returned timestamp (so you will probably need to set a 
platform specific timeout to
+    /// know when this occurs).
+    PN_CPP_EXTERN timestamp tick(timestamp now);
+
     /// Inform the engine that the transport been disconnected unexpectedly,
     /// without completing the AMQP connection close sequence.
     ///

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ee21553/proton-c/bindings/cpp/include/proton/timestamp.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/timestamp.hpp 
b/proton-c/bindings/cpp/include/proton/timestamp.hpp
index 6e4281b..ea578bb 100644
--- a/proton-c/bindings/cpp/include/proton/timestamp.hpp
+++ b/proton-c/bindings/cpp/include/proton/timestamp.hpp
@@ -56,6 +56,7 @@ inline bool operator==(timestamp x, timestamp y) { return 
x.milliseconds() == y.
 inline bool operator<(timestamp x, timestamp y) { return x.milliseconds() < 
y.milliseconds(); }
 
 inline timestamp operator+(timestamp ts, duration d) { return 
timestamp(ts.milliseconds() + d.milliseconds()); }
+inline timestamp operator-(timestamp ts, duration d) { return 
timestamp(ts.milliseconds() - d.milliseconds()); }
 inline duration operator-(timestamp t0, timestamp t1) { return 
duration(t0.milliseconds() - t1.milliseconds()); }
 inline timestamp operator+(duration d, timestamp ts) { return ts + d; }
 /// @}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ee21553/proton-c/bindings/cpp/src/connection_driver_test.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/connection_driver_test.cpp 
b/proton-c/bindings/cpp/src/connection_driver_test.cpp
index db5bc90..7fcde46 100644
--- a/proton-c/bindings/cpp/src/connection_driver_test.cpp
+++ b/proton-c/bindings/cpp/src/connection_driver_test.cpp
@@ -32,6 +32,7 @@
 #include "proton/sender_options.hpp"
 #include "proton/source_options.hpp"
 #include "proton/thread_safe.hpp"
+#include "proton/transport.hpp"
 #include "proton/types_fwd.hpp"
 #include "proton/uuid.hpp"
 
@@ -87,14 +88,17 @@ struct in_memory_driver : public connection_driver {
             throw test::error("no activity, interrupting test");
     }
 
-    void process() {
+    timestamp process(timestamp t = timestamp()) {
         check_idle();
         if (!dispatch())
             throw test::error("unexpected close: 
"+connection().error().what());
+        timestamp next_tick;
+        if (t!=timestamp()) next_tick = tick(t);
         do_read();
         do_write();
         check_idle();
         dispatch();
+        return next_tick;
     }
 };
 
@@ -115,12 +119,43 @@ struct driver_pair {
     void process() { a.process(); b.process(); }
 };
 
-template <class S> typename S::value_type quick_pop(S& s) {
-    ASSERT(!s.empty());
-    typename S::value_type x = s.front();
-    s.pop_front();
-    return x;
-}
+/// A pair of drivers that talk to each other in-memory, simulating a 
connection.
+/// This version also simulates the passage of time
+struct timed_driver_pair {
+    duration timeout;
+    byte_stream ab, ba;
+    in_memory_driver a, b;
+    timestamp now;
+
+    timed_driver_pair(duration t, const connection_options& oa0, const 
connection_options& ob0,
+                const std::string& name=""
+    ) :
+        timeout(t),
+        a(ba, ab, name+"a"), b(ab, ba, name+"b"),
+        now(100100100)
+    {
+        connection_options oa(oa0);
+        connection_options ob(ob0);
+        a.connect(oa.idle_timeout(t));
+        b.accept(ob.idle_timeout(t));
+    }
+
+    void process_untimed() { a.process(); b.process(); }
+    void process_timed_succeed() {
+        timestamp anow = now + timeout - duration(100);
+        timestamp bnow = now + timeout - duration(100);
+        a.process(anow);
+        b.process(bnow);
+        now = std::max(anow, bnow);
+    }
+    void process_timed_fail() {
+        timestamp anow = now + timeout + timeout + duration(100);
+        timestamp bnow = now + timeout + timeout + duration(100);
+        a.process(anow);
+        b.process(bnow);
+        now = std::max(anow, bnow);
+    }
+};
 
 /// A handler that records incoming endpoints, errors etc.
 struct record_handler : public messaging_handler {
@@ -162,6 +197,13 @@ struct record_handler : public messaging_handler {
     }
 };
 
+template <class S> typename S::value_type quick_pop(S& s) {
+    ASSERT(!s.empty());
+    typename S::value_type x = s.front();
+    s.pop_front();
+    return x;
+}
+
 struct namer : public io::link_namer {
     char name;
     namer(char c) : name(c) {}
@@ -340,6 +382,52 @@ void test_message() {
     ASSERT_EQUAL(value("b"), m2.message_annotations().get("a"));
 }
 
+void test_message_timeout_succeed() {
+    // Verify a message arrives intact
+    record_handler ha, hb;
+    timed_driver_pair d(duration(2000), ha, hb);
+
+    proton::sender s = d.a.connection().open_sender("x");
+    d.process_timed_succeed();
+    proton::message m("barefoot_timed_succeed");
+    m.properties().put("x", "y");
+    m.message_annotations().put("a", "b");
+    s.send(m);
+
+    while (hb.messages.size() == 0)
+        d.process_timed_succeed();
+
+    proton::message m2 = quick_pop(hb.messages);
+    ASSERT_EQUAL(value("barefoot_timed_succeed"), m2.body());
+    ASSERT_EQUAL(value("y"), m2.properties().get("x"));
+    ASSERT_EQUAL(value("b"), m2.message_annotations().get("a"));
+}
+
+void test_message_timeout_fail() {
+    // Verify a message arrives intact
+    record_handler ha, hb;
+    timed_driver_pair d(duration(2000), ha, hb);
+
+    proton::sender s = d.a.connection().open_sender("x");
+    d.process_timed_fail();
+    proton::message m("barefoot_timed_fail");
+    m.properties().put("x", "y");
+    m.message_annotations().put("a", "b");
+    s.send(m);
+
+    d.process_timed_fail();
+
+    ASSERT_THROWS(test::error,
+        while (hb.messages.size() == 0) {
+            d.process_timed_fail();
+        }
+    );
+
+    ASSERT_EQUAL(1u, hb.transport_errors.size());
+    ASSERT_EQUAL("amqp:resource-limit-exceeded: local-idle-timeout expired", 
d.b.transport().error().what());
+    ASSERT_EQUAL(1u, ha.connection_errors.size());
+    ASSERT_EQUAL("amqp:resource-limit-exceeded: local-idle-timeout expired", 
d.a.connection().error().what());
+}
 }
 
 int main(int argc, char** argv) {
@@ -349,7 +437,9 @@ int main(int argc, char** argv) {
     RUN_ARGV_TEST(failed, test_driver_disconnected());
     RUN_ARGV_TEST(failed, test_no_container());
     RUN_ARGV_TEST(failed, test_spin_interrupt());
-    RUN_ARGV_TEST(failed, test_message());
     RUN_ARGV_TEST(failed, test_link_filters());
+    RUN_ARGV_TEST(failed, test_message());
+    RUN_ARGV_TEST(failed, test_message_timeout_succeed());
+    RUN_ARGV_TEST(failed, test_message_timeout_fail());
     return failed;
 }

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/0ee21553/proton-c/bindings/cpp/src/io/connection_driver.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/io/connection_driver.cpp 
b/proton-c/bindings/cpp/src/io/connection_driver.cpp
index d907e5c..cc83f51 100644
--- a/proton-c/bindings/cpp/src/io/connection_driver.cpp
+++ b/proton-c/bindings/cpp/src/io/connection_driver.cpp
@@ -19,9 +19,11 @@
 
 #include "proton/io/connection_driver.hpp"
 
+#include "proton/connection.hpp"
 #include "proton/container.hpp"
 #include "proton/error.hpp"
 #include "proton/messaging_handler.hpp"
+#include "proton/transport.hpp"
 #include "proton/uuid.hpp"
 #include "proton/work_queue.hpp"
 
@@ -128,6 +130,10 @@ void connection_driver::write_close() {
     pn_connection_driver_write_close(&driver_);
 }
 
+timestamp connection_driver::tick(timestamp now) {
+    return timestamp(pn_transport_tick(driver_.transport, now.milliseconds()));
+}
+
 void connection_driver::disconnected(const proton::error_condition& err) {
     pn_condition_t* condition = pn_transport_condition(driver_.transport);
     if (!pn_condition_is_set(condition))  {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to