Repository: qpid-proton
Updated Branches:
  refs/heads/master 118a5c3d9 -> 2d44fa2df


PROTON-1221: c++ container::schedule() support.

container::schedule() with simple example of sending messages at a fixed 
interval.
Examples for C++03 and C++11.
Modified inject() to use the same proton::void_function0 as schedule for C++03.

Note: the example chains schedule() calls at a fixed interval. A precise
fixed-frequency sender should take account of the actual time to correct for
variations.


Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2d44fa2d
Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2d44fa2d
Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2d44fa2d

Branch: refs/heads/master
Commit: 2d44fa2dfc9f528d965343e80d35142d5fcc0bf9
Parents: 118a5c3
Author: Alan Conway <[email protected]>
Authored: Tue Jun 7 16:50:13 2016 -0400
Committer: Alan Conway <[email protected]>
Committed: Tue Jun 14 09:10:39 2016 -0400

----------------------------------------------------------------------
 examples/cpp/CMakeLists.txt                     |  11 +-
 examples/cpp/README.dox                         |  16 +++
 examples/cpp/example_test.py                    |  22 ++--
 examples/cpp/mt/epoll_container.cpp             |   7 +-
 examples/cpp/scheduled_send.cpp                 | 108 ++++++++++++++++
 examples/cpp/scheduled_send_03.cpp              | 124 +++++++++++++++++++
 .../bindings/cpp/include/proton/container.hpp   |  13 ++
 .../bindings/cpp/include/proton/event_loop.hpp  |  15 +--
 .../bindings/cpp/include/proton/function.hpp    |  42 +++++++
 .../cpp/include/proton/internal/config.hpp      |   4 +
 .../bindings/cpp/include/proton/thread_safe.hpp |   4 +-
 proton-c/bindings/cpp/src/container_impl.cpp    |  59 +++++++++
 proton-c/bindings/cpp/src/container_impl.hpp    |   4 +
 .../bindings/cpp/src/test_dummy_container.hpp   |   6 +-
 14 files changed, 414 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/examples/cpp/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/examples/cpp/CMakeLists.txt b/examples/cpp/CMakeLists.txt
index 06ec1a4..851de2b 100644
--- a/examples/cpp/CMakeLists.txt
+++ b/examples/cpp/CMakeLists.txt
@@ -23,13 +23,14 @@ include_directories(${ProtonCpp_INCLUDE_DIRS})
 link_libraries(${ProtonCpp_LIBRARIES})
 add_definitions(${CXX_WARNING_FLAGS})
 
-# Single-threaded examples.
+# Single-threaded examples that work on C++03
 foreach(example
     broker
     helloworld
     helloworld_direct
     simple_recv
     simple_send
+    scheduled_send_03
     direct_recv
     direct_send
     client
@@ -45,6 +46,14 @@ foreach(example
   add_executable(${example} ${example}.cpp)
 endforeach()
 
+# Single-threaded examples that require C++11
+if(HAS_CPP11)
+  foreach(example
+      scheduled_send)
+    add_executable(${example} ${example}.cpp)
+  endforeach()
+endif()
+
 # Python test runner
 set(env_py ${PYTHON_EXECUTABLE} ${CMAKE_SOURCE_DIR}/proton-c/env.py)
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/examples/cpp/README.dox
----------------------------------------------------------------------
diff --git a/examples/cpp/README.dox b/examples/cpp/README.dox
index 897b302..be088be 100644
--- a/examples/cpp/README.dox
+++ b/examples/cpp/README.dox
@@ -138,3 +138,19 @@ A multithreaded broker, that will work on any 
multi-threaded container. See @ref
 __Requires C++11__
 
 */
+
+/** @example schedule_send.cpp
+
+Shows how to use proton::container::schedule to schedule a timed callback.
+This version uses std::function and so requires C++11 or better. For a C++03 
compatible
+approach see @ref schedule_send_03.cpp.
+
+*/
+
+/** @example schedule_send_03.cpp
+
+Shows how to use proton::container::schedule to schedule a timed callback in a
+C++03 compatible way. See @ref schedule_send.cpp for a more convenient approach
+using std::function if you have C++11.
+
+*/

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/examples/cpp/example_test.py
----------------------------------------------------------------------
diff --git a/examples/cpp/example_test.py b/examples/cpp/example_test.py
index 65dcee8..bb7c48d 100644
--- a/examples/cpp/example_test.py
+++ b/examples/cpp/example_test.py
@@ -337,6 +337,20 @@ Hello World!
         expect_found = (out.find(expect) >= 0)
         self.assertEqual(expect_found, True)
 
+    def test_scheduled_send_03(self):
+        # Output should be a bunch of "send" lines but can't guarantee exactly 
how many.
+        out = self.proc(["scheduled_send_03", "-a", 
self.addr+"scheduled_send", "-t", "0.1", "-i", "0.001"]).wait_exit().split()
+        self.assertGreater(len(out), 0);
+        self.assertEqual(["send"]*len(out), out)
+
+    def test_scheduled_send(self):
+        try:
+            out = self.proc(["scheduled_send", "-a", 
self.addr+"scheduled_send", "-t", "0.1", "-i", "0.001"]).wait_exit().split()
+            self.assertGreater(len(out), 0);
+            self.assertEqual(["send"]*len(out), out)
+        except ProcError:       # File not found, not a C++11 build.
+            pass
+
 
 class EngineTestCase(BrokerTestCase):
     """Run selected clients to test a connction_engine broker."""
@@ -377,14 +391,6 @@ class EngineTestCase(BrokerTestCase):
         self.assertEqual(CLIENT_EXPECT,
                          self.proc(["client", "-a", self.addr]).wait_exit())
 
-    def test_flow_control(self):
-        return
-        want="""success: Example 1: simple credit
-success: Example 2: basic drain
-success: Example 3: drain without credit
-success: Exmaple 4: high/low watermark
-"""
-        self.assertEqual(want, self.proc(["flow_control", pick_addr(), 
"-quiet"]).wait_exit())
 
 class MtBrokerTest(EngineTestCase):
     broker_exe = "mt_broker"

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/examples/cpp/mt/epoll_container.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/mt/epoll_container.cpp 
b/examples/cpp/mt/epoll_container.cpp
index feea300..63b6a5f 100644
--- a/examples/cpp/mt/epoll_container.cpp
+++ b/examples/cpp/mt/epoll_container.cpp
@@ -134,6 +134,9 @@ class epoll_container : public 
proton::io::container_impl_base {
         std::atomic<uint64_t> count_;
     };
 
+     // FIXME aconway 2016-06-07: Unfinished
+    void schedule(proton::duration, std::function<void()>) OVERRIDE { throw 
std::logic_error("FIXME"); }
+    void schedule(proton::duration, proton::void_function0&) OVERRIDE { throw 
std::logic_error("FIXME"); }
     atomic_link_namer link_namer;
 
   private:
@@ -244,8 +247,8 @@ class epoll_event_loop : public proton::event_loop {
         return true;
     }
 
-    bool inject(proton::inject_handler& h) OVERRIDE {
-        return inject(std::bind(&proton::inject_handler::on_inject, &h));
+    bool inject(proton::void_function0& f) OVERRIDE {
+        return inject([&f]() { f(); });
     }
 
     jobs pop_all() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/examples/cpp/scheduled_send.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send.cpp b/examples/cpp/scheduled_send.cpp
new file mode 100644
index 0000000..280c737
--- /dev/null
+++ b/examples/cpp/scheduled_send.cpp
@@ -0,0 +1,108 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/default_container.hpp>
+#include <proton/messaging_handler.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+// Send messages at a constant rate one per interval. cancel after a timeout.
+class scheduled_sender : public proton::messaging_handler {
+  private:
+    std::string url;
+    proton::sender sender;
+    proton::duration interval, timeout;
+    bool ready, canceled;
+
+  public:
+
+    scheduled_sender(const std::string &s, double d, double t) :
+        url(s),
+        interval(int(d*proton::duration::SECOND.milliseconds())), // Send 
interval.
+        timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel 
after timeout.
+        ready(true),            // Ready to send.
+        canceled(false)         // Canceled.
+    {}
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        sender = c.open_sender(url);
+        // Call this->cancel after timeout.
+        c.schedule(timeout, [this]() { this->cancel(); });
+         // Start regular ticks every interval.
+        c.schedule(interval, [this]() { this->tick(); });
+    }
+
+    void cancel() {
+        canceled = true;
+        sender.connection().close();
+    }
+
+    void tick() {
+        // Schedule the next tick unless we have been cancelled.
+        if (!canceled)
+            sender.container().schedule(interval, [this]() { this->tick(); });
+        if (sender.credit() > 0) // Only send if we have credit
+            send();
+        else
+            ready = true;  // Set the ready flag, send as soon as we get 
credit.
+    }
+
+    void on_sendable(proton::sender &) OVERRIDE {
+        if (ready)              // We have been ticked since the last send.
+            send();
+    }
+
+    void send() {
+        std::cout << "send" << std::endl;
+        sender.send(proton::message("ping"));
+        ready = false;
+    }
+};
+
+
+int main(int argc, char **argv) {
+    std::string address("127.0.0.1:5672/examples");
+    double interval = 1.0;
+    double timeout = 5.0;
+
+    example::options opts(argc, argv);
+
+    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+    opts.add_value(interval, 'i', "interval", "send a message every INTERVAL 
seconds", "INTERVAL");
+    opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T");
+
+    try {
+        opts.parse();
+        scheduled_sender h(address, interval, timeout);
+        proton::default_container(h).run();
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/examples/cpp/scheduled_send_03.cpp
----------------------------------------------------------------------
diff --git a/examples/cpp/scheduled_send_03.cpp 
b/examples/cpp/scheduled_send_03.cpp
new file mode 100644
index 0000000..5d3aaac
--- /dev/null
+++ b/examples/cpp/scheduled_send_03.cpp
@@ -0,0 +1,124 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include "options.hpp"
+
+#include <proton/default_container.hpp>
+#include <proton/messaging_handler.hpp>
+
+#include <iostream>
+
+#include "fake_cpp11.hpp"
+
+// Send messages at a constant rate one per interval. cancel after a timeout.
+// This example uses only C++03 features.
+class scheduled_sender : public proton::messaging_handler {
+  private:
+    std::string url;
+    proton::sender sender;
+    proton::duration interval, timeout;
+    bool ready, canceled;
+
+    struct tick_fn : public proton::void_function0 {
+        scheduled_sender& parent;
+        tick_fn(scheduled_sender& ss) : parent(ss) {}
+        void operator()() { parent.tick(); }
+    };
+
+    struct cancel_fn : public proton::void_function0 {
+        scheduled_sender& parent;
+        cancel_fn(scheduled_sender& ss) : parent(ss) {}
+        void operator()() { parent.cancel(); }
+    };
+
+    tick_fn do_tick;
+    cancel_fn do_cancel;
+
+  public:
+
+    scheduled_sender(const std::string &s, double d, double t) :
+        url(s),
+        interval(int(d*proton::duration::SECOND.milliseconds())), // Send 
interval.
+        timeout(int(t*proton::duration::SECOND.milliseconds())), // Cancel 
after timeout.
+        ready(true),            // Ready to send.
+        canceled(false),         // Canceled.
+        do_tick(*this),
+        do_cancel(*this)
+    {}
+
+    void on_container_start(proton::container &c) OVERRIDE {
+        sender = c.open_sender(url);
+        c.schedule(timeout, do_cancel); // Call this->cancel after timeout.
+        c.schedule(interval, do_tick); // Start regular ticks every interval.
+    }
+
+    void cancel() {
+        canceled = true;
+        sender.connection().close();
+    }
+
+    void tick() {
+        if (!canceled) {
+            sender.container().schedule(interval, do_tick); // Next tick
+            if (sender.credit() > 0) // Only send if we have credit
+                send();
+            else
+                ready = true; // Set the ready flag, send as soon as we get 
credit.
+        }
+    }
+
+    void on_sendable(proton::sender &) OVERRIDE {
+        if (ready)              // We have been ticked since the last send.
+            send();
+    }
+
+    void send() {
+        std::cout << "send" << std::endl;
+        sender.send(proton::message("ping"));
+        ready = false;
+    }
+};
+
+
+int main(int argc, char **argv) {
+    std::string address("127.0.0.1:5672/examples");
+    double interval = 1.0;
+    double timeout = 5.0;
+
+    example::options opts(argc, argv);
+
+    opts.add_value(address, 'a', "address", "connect and send to URL", "URL");
+    opts.add_value(interval, 'i', "interval", "send a message every INTERVAL 
seconds", "INTERVAL");
+    opts.add_value(timeout, 't', "timeout", "stop after T seconds", "T");
+
+    try {
+        opts.parse();
+        scheduled_sender h(address, interval, timeout);
+        proton::default_container(h).run();
+        return 0;
+    } catch (const example::bad_option& e) {
+        std::cout << opts << std::endl << e.what() << std::endl;
+    } catch (const std::exception& e) {
+        std::cerr << e.what() << std::endl;
+    }
+
+    return 1;
+}

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/proton-c/bindings/cpp/include/proton/container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/container.hpp 
b/proton-c/bindings/cpp/include/proton/container.hpp
index ed48803..16f63fd 100644
--- a/proton-c/bindings/cpp/include/proton/container.hpp
+++ b/proton-c/bindings/cpp/include/proton/container.hpp
@@ -23,6 +23,7 @@
  */
 
 #include "./connection_options.hpp"
+#include "./function.hpp"
 #include "./listener.hpp"
 #include "./thread_safe.hpp"
 
@@ -198,6 +199,13 @@ class PN_CPP_CLASS_EXTERN container {
     /// @copydoc receiver_options
     virtual class receiver_options receiver_options() const = 0;
 
+#if PN_CPP_HAS_STD_FUNCTION
+    /// Schedule a function to be called after the duration
+    virtual void schedule(duration, std::function<void()>) = 0;
+#endif
+    /// Schedule a function to be called after the duration.
+    /// C++03 compatible, for C++11 use the schedule(duration, 
std::function<void()>)
+    virtual void schedule(duration, void_function0&) = 0;
 };
 
 /// This is an header only class that can be used to help using containers 
more natural
@@ -241,6 +249,11 @@ class container_ref : public container {
 
     std::string id() const { return impl_->id(); }
 
+#if PN_CPP_HAS_STD_FUNCTION
+    PN_CPP_EXTERN void schedule(duration d, std::function<void()> f) { return 
impl_->schedule(d, f); }
+#endif
+    PN_CPP_EXTERN void schedule(duration d, void_function0& f) { return 
impl_->schedule(d, f); }
+
     void client_connection_options(const connection_options& c) { 
impl_->client_connection_options(c); }
     connection_options client_connection_options() const { return 
impl_->client_connection_options(); }
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/proton-c/bindings/cpp/include/proton/event_loop.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/event_loop.hpp 
b/proton-c/bindings/cpp/include/proton/event_loop.hpp
index 6d231b6..ae572ce 100644
--- a/proton-c/bindings/cpp/include/proton/event_loop.hpp
+++ b/proton-c/bindings/cpp/include/proton/event_loop.hpp
@@ -23,6 +23,7 @@
  */
 
 #include "./internal/config.hpp"
+#include "./function.hpp"
 
 #include <functional>
 
@@ -38,13 +39,13 @@ struct pn_link_t;
 namespace proton {
 
 /// **Experimental** - A handler for injected code.
-class inject_handler {
+///
+/// @deprecated use void_function0.
+class inject_handler : public void_function0 {
   public:
     virtual ~inject_handler() {}
-
-    // XXX bad name, should be operator()() to be idiomatic and consistent 
with C++11.
-    /// Called when the injected code is executed.
     virtual void on_inject() = 0;
+    void operator()() { on_inject(); }
 };
 
 /// **Experimental** - A serial execution context.
@@ -62,10 +63,10 @@ class PN_CPP_CLASS_EXTERN event_loop {
     ///
     /// @return true if f() has or will be called, false if the event_loop is 
ended
     /// and f() cannot be injected.
-    virtual bool inject(inject_handler& f) = 0;
+    virtual bool inject(void_function0& f) = 0;
 
-#if PN_CPP_HAS_CPP11
-    /// @copydoc inject(inject_handler&)
+#if PN_CPP_HAS_STD_FUNCTION
+    /// @copydoc inject(void_function0&)
     virtual bool inject(std::function<void()> f) = 0;
 #endif
 

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/proton-c/bindings/cpp/include/proton/function.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/function.hpp 
b/proton-c/bindings/cpp/include/proton/function.hpp
new file mode 100644
index 0000000..4fbbe16
--- /dev/null
+++ b/proton-c/bindings/cpp/include/proton/function.hpp
@@ -0,0 +1,42 @@
+#ifndef PROTON_FUNCTION_HPP
+#define PROTON_FUNCTION_HPP
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+namespace proton {
+
+/// A C++03 compatible void no-argument callback function object, used by
+/// container::schedule() and event_loop::inject()
+/// In C++11 you can use std::bind, std::function or a void-no-argument lambda 
instead.
+///
+/// void_function0 is passed by reference, so instances of sub-classes do not
+/// have to be heap allocated.  Once passed, the instance must not be deleted 
until
+/// its operator() is called or the container has stopped.
+///
+class void_function0 {
+  public:
+    virtual ~void_function0() {}
+    /// Override the call operator with your code.
+    virtual void operator()() = 0;
+};
+
+}
+
+#endif // PROTON_FUNCTION_HPP

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/proton-c/bindings/cpp/include/proton/internal/config.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/internal/config.hpp 
b/proton-c/bindings/cpp/include/proton/internal/config.hpp
index ff7af09..cd5370e 100644
--- a/proton-c/bindings/cpp/include/proton/internal/config.hpp
+++ b/proton-c/bindings/cpp/include/proton/internal/config.hpp
@@ -83,6 +83,10 @@
 #define PN_CPP_HAS_DELETED_FUNCTIONS PN_CPP_HAS_CPP11
 #endif
 
+#ifndef PN_CPP_HAS_STD_FUNCTION
+#define PN_CPP_HAS_STD_FUNCTION PN_CPP_HAS_CPP11
+#endif
+
 #endif // PROTON_INTERNAL_CONFIG_HPP
 
 /// @endcond

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/proton-c/bindings/cpp/include/proton/thread_safe.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/include/proton/thread_safe.hpp 
b/proton-c/bindings/cpp/include/proton/thread_safe.hpp
index 3a4ca96..eb4df48 100644
--- a/proton-c/bindings/cpp/include/proton/thread_safe.hpp
+++ b/proton-c/bindings/cpp/include/proton/thread_safe.hpp
@@ -69,10 +69,10 @@ template <class T>
 class thread_safe : private internal::pn_ptr_base, private 
internal::endpoint_traits<T> {
     typedef typename T::pn_type pn_type;
 
-    struct inject_decref : public inject_handler {
+    struct inject_decref : public void_function0 {
         pn_type* ptr_;
         inject_decref(pn_type* p) : ptr_(p) {}
-        void on_inject() PN_CPP_OVERRIDE { decref(ptr_); delete this; }
+        void operator()() PN_CPP_OVERRIDE { decref(ptr_); delete this; }
     };
 
   public:

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/proton-c/bindings/cpp/src/container_impl.cpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.cpp 
b/proton-c/bindings/cpp/src/container_impl.cpp
index 3f7ad45..163a801 100644
--- a/proton-c/bindings/cpp/src/container_impl.cpp
+++ b/proton-c/bindings/cpp/src/container_impl.cpp
@@ -24,6 +24,7 @@
 #include "proton/connection.hpp"
 #include "proton/session.hpp"
 #include "proton/error.hpp"
+#include "proton/event_loop.hpp"
 #include "proton/sender.hpp"
 #include "proton/receiver.hpp"
 #include "proton/task.hpp"
@@ -156,6 +157,25 @@ container_impl::~container_impl() {
         close_acceptor(i->second);
 }
 
+namespace {
+// FIXME aconway 2016-06-07: this is not thread safe. It is sufficient for 
using
+// default_container::schedule() inside a handler but not for inject() from
+// another thread.
+struct immediate_event_loop : public event_loop {
+    virtual bool inject(void_function0& f) PN_CPP_OVERRIDE {
+        try { f(); } catch(...) {}
+        return true;
+    }
+
+#if PN_CPP_HAS_CPP11
+    virtual bool inject(std::function<void()> f) PN_CPP_OVERRIDE {
+        try { f(); } catch(...) {}
+        return true;
+    }
+#endif
+};
+}
+
 returned<connection> container_impl::connect(const std::string &urlstr, const 
connection_options &user_opts) {
     connection_options opts = client_connection_options(); // Defaults
     opts.update(user_opts);
@@ -167,6 +187,7 @@ returned<connection> container_impl::connect(const 
std::string &urlstr, const co
     internal::pn_unique_ptr<connector> ctor(new connector(conn, url, opts));
     connection_context& cc(connection_context::get(conn));
     cc.handler.reset(ctor.release());
+    cc.event_loop.reset(new immediate_event_loop);
     pn_connection_set_container(unwrap(conn), id_.c_str());
 
     conn.open(opts);
@@ -230,6 +251,43 @@ task container_impl::schedule(int delay, proton_handler 
*h) {
     return reactor_.schedule(delay, task_handler.get());
 }
 
+namespace {
+// Abstract base for timer_handler_std and timer_handler_03
+struct timer_handler : public proton_handler, public void_function0 {
+    void on_timer_task(proton_event& ) PN_CPP_OVERRIDE {
+        (*this)();
+        delete this;
+    }
+    void on_reactor_final(proton_event&) PN_CPP_OVERRIDE {
+        delete this;
+    }
+};
+
+struct timer_handler_03 : public timer_handler {
+    void_function0& func;
+    timer_handler_03(void_function0& f): func(f) {}
+    void operator()() PN_CPP_OVERRIDE { func(); }
+};
+}
+
+void container_impl::schedule(duration delay, void_function0& f) {
+    schedule(delay.milliseconds(), new timer_handler_03(f));
+}
+
+#if PN_CPP_HAS_STD_FUNCTION
+namespace {
+struct timer_handler_std : public timer_handler {
+    std::function<void()> func;
+    timer_handler_std(std::function<void()> f): func(f) {}
+    void operator()() PN_CPP_OVERRIDE { func(); }
+};
+}
+
+void container_impl::schedule(duration delay, std::function<void()> f) {
+    schedule(delay.milliseconds(), new timer_handler_std(f));
+}
+#endif
+
 void container_impl::client_connection_options(const connection_options &opts) 
{
     client_connection_options_ = opts;
 }
@@ -260,6 +318,7 @@ void container_impl::configure_server_connection(connection 
&c) {
         pn_record_t *record = pn_connection_attachments(unwrap(c));
         pn_record_set_handler(record, chandler.get());
     }
+    connection_context::get(c).event_loop.reset(new immediate_event_loop);
 }
 
 void container_impl::run() {

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/proton-c/bindings/cpp/src/container_impl.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/container_impl.hpp 
b/proton-c/bindings/cpp/src/container_impl.hpp
index 8fd64d0..07f2563 100644
--- a/proton-c/bindings/cpp/src/container_impl.hpp
+++ b/proton-c/bindings/cpp/src/container_impl.hpp
@@ -72,6 +72,10 @@ class container_impl : public container {
     void run() PN_CPP_OVERRIDE;
     void stop(const error_condition& err) PN_CPP_OVERRIDE;
     void auto_stop(bool set) PN_CPP_OVERRIDE;
+#if PN_CPP_HAS_STD_FUNCTION
+    void schedule(duration, std::function<void()>) PN_CPP_OVERRIDE;
+#endif
+    void schedule(duration, void_function0&) PN_CPP_OVERRIDE;
 
     // non-interface functions
     void configure_server_connection(connection &c);

http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d44fa2d/proton-c/bindings/cpp/src/test_dummy_container.hpp
----------------------------------------------------------------------
diff --git a/proton-c/bindings/cpp/src/test_dummy_container.hpp 
b/proton-c/bindings/cpp/src/test_dummy_container.hpp
index 7307901..24bb415 100644
--- a/proton-c/bindings/cpp/src/test_dummy_container.hpp
+++ b/proton-c/bindings/cpp/src/test_dummy_container.hpp
@@ -52,6 +52,10 @@ class dummy_container : public container {
     class sender_options sender_options() const { return sopts_; }
     void receiver_options(const class receiver_options &o) { ropts_ = o; }
     class receiver_options receiver_options() const { return ropts_; }
+#if PN_CPP_HAS_STD_FUNCTION
+    void schedule(duration, std::function<void()>) { throw fail; }
+#endif
+    void schedule(duration, void_function0&) { throw fail; }
 
   private:
     std::string id_;
@@ -65,7 +69,7 @@ class dummy_event_loop : public event_loop {
 #if PN_CPP_HAS_CPP11
     bool inject(std::function<void()> f) PN_CPP_OVERRIDE { f(); return true; }
 #endif
-    bool inject(proton::inject_handler& h) PN_CPP_OVERRIDE { h.on_inject(); 
return true; }
+    bool inject(proton::void_function0& h) PN_CPP_OVERRIDE { h(); return true; 
}
 };
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to