This is an automated email from the ASF dual-hosted git repository.

astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git


The following commit(s) were added to refs/heads/main by this push:
     new 9a89ff4ea PROTON-2438: Add cancellable tasks feature and tests
9a89ff4ea is described below

commit 9a89ff4eadee92ce014616492675b1d7195db4e2
Author: Rakhi Kumari <[email protected]>
AuthorDate: Mon Aug 29 10:14:58 2022 +0530

    PROTON-2438: Add cancellable tasks feature and tests
---
 cpp/include/proton/container.hpp    |  11 +++-
 cpp/src/container.cpp               |   8 ++-
 cpp/src/container_test.cpp          | 111 ++++++++++++++++++++++++++++++++++++
 cpp/src/proactor_container_impl.cpp |  20 ++++++-
 cpp/src/proactor_container_impl.hpp |   7 ++-
 5 files changed, 147 insertions(+), 10 deletions(-)

diff --git a/cpp/include/proton/container.hpp b/cpp/include/proton/container.hpp
index f5a4dd655..839cf8137 100644
--- a/cpp/include/proton/container.hpp
+++ b/cpp/include/proton/container.hpp
@@ -35,6 +35,7 @@
 /// @copybrief proton::container
 
 namespace proton {
+typedef uint64_t work_handle;
 
 /// A top-level container of connections, sessions, and links.
 ///
@@ -303,15 +304,19 @@ class PN_CPP_CLASS_EXTERN container {
     ///
     /// **C++ versions** - With C++11 and later, use a
     /// `std::function<void()>` type for the `fn` parameter.
-    PN_CPP_EXTERN void schedule(duration dur, work fn);
+    PN_CPP_EXTERN work_handle schedule(duration dur, work fn);
 
     /// **Deprecated** - Use `container::schedule(duration, work)`.
-    PN_CPP_EXTERN PN_CPP_DEPRECATED("Use 'container::schedule(duration, 
work)'") void schedule(duration dur, void_function0& fn);
+    PN_CPP_EXTERN PN_CPP_DEPRECATED("Use 'container::schedule(duration, 
work)'") work_handle schedule(duration dur, void_function0& fn);
+
+    /// Cancel task for the given work_handle.
+    PN_CPP_EXTERN void cancel(work_handle);
 
   private:
     /// Declare both v03 and v11 if compiling with c++11 as the library 
contains both.
     /// A C++11 user should never call the v03 overload so it is private in 
this case
-    PN_CPP_EXTERN void schedule(duration dur, internal::v03::work fn);
+    PN_CPP_EXTERN work_handle schedule(duration dur, internal::v03::work fn);
+
     class impl;
     std::unique_ptr<impl> impl_;
 
diff --git a/cpp/src/container.cpp b/cpp/src/container.cpp
index 07972aa11..8189539de 100644
--- a/cpp/src/container.cpp
+++ b/cpp/src/container.cpp
@@ -113,10 +113,12 @@ returned<receiver> container::open_receiver(
 
 std::string container::id() const { return impl_->id(); }
 
-void container::schedule(duration d, internal::v03::work f) { return 
impl_->schedule(d, f); }
-void container::schedule(duration d, internal::v11::work f) { return 
impl_->schedule(d, f); }
+work_handle container::schedule(duration d, internal::v03::work f) { return 
impl_->schedule(d, f); }
+work_handle container::schedule(duration d, internal::v11::work f) { return 
impl_->schedule(d, f); }
 
-void container::schedule(duration d, void_function0& f) { return 
impl_->schedule(d, make_work(&void_function0::operator(), &f)); }
+work_handle container::schedule(duration d, void_function0& f) { return 
impl_->schedule(d, make_work(&void_function0::operator(), &f)); }
+
+void container::cancel(work_handle work_handle) { impl_->cancel(work_handle); }
 
 void container::client_connection_options(const connection_options& c) { 
impl_->client_connection_options(c); }
 connection_options container::client_connection_options() const { return 
impl_->client_connection_options(); }
diff --git a/cpp/src/container_test.cpp b/cpp/src/container_test.cpp
index d49aad9f5..b414bdf67 100644
--- a/cpp/src/container_test.cpp
+++ b/cpp/src/container_test.cpp
@@ -574,6 +574,116 @@ void test_container_mt_close_race() {
     }
 }
 
+class schedule_cancel : public proton::messaging_handler {
+    proton::listener listener;
+    test_listen_handler listen_handler;
+    long long w1_handle;
+    long long w2_handle;
+    long long w3_handle;
+    long long w4_handle;
+    long long w5_handle;
+
+    void change_w1_state(proton::container* c) {
+        w1_state = 1;
+    }
+
+    void change_w2_state(proton::container* c) {
+        w2_state = 1;
+    }
+
+    void change_w3_state(proton::container* c) {
+        w3_state = 1;
+    }
+
+    void change_w4_state(proton::container* c) {
+        w4_state = 1;
+    }
+
+    void change_w5_state(proton::container* c) {
+        w5_state = 1;
+    }
+
+    void on_container_start(proton::container& c) override {
+        ASSERT(w1_state==0);
+        ASSERT(w2_state==0);
+        ASSERT(w3_state==0);
+        ASSERT(w4_state==0);
+        ASSERT(w5_state==0);
+
+        listener = c.listen("//:0", listen_handler);
+
+        // We will cancel this scheduled task before its execution.
+        w1_handle = c.schedule(proton::duration(250), 
proton::make_work(&schedule_cancel::change_w1_state, this, &c));
+
+        // We will cancel this scheduled task before its execution and will 
try to cancel it again.
+        w2_handle = c.schedule(proton::duration(260), 
proton::make_work(&schedule_cancel::change_w2_state, this, &c));
+
+        // We will not cancel this scheduled task.
+        w3_handle = c.schedule(proton::duration(35), 
proton::make_work(&schedule_cancel::change_w3_state, this, &c));
+
+        // We will try to cancel this task before its execution from different 
thread i.e connection thread.
+        w4_handle = c.schedule(proton::duration(270), 
proton::make_work(&schedule_cancel::change_w4_state, this, &c));
+
+        // We will try to cancel this task after its execution from different 
thread i.e. connection thread.
+        w5_handle = c.schedule(proton::duration(0), 
proton::make_work(&schedule_cancel::change_w5_state, this, &c));
+
+        // Cancel the first scheduled task.
+        c.cancel(w1_handle);
+
+        // Try cancelling the second scheduled task two times.
+        c.cancel(w2_handle);
+        c.cancel(w2_handle);
+
+        // Try cancelling invalid work handle.
+        c.cancel(-1);
+        c.auto_stop(false);
+    }
+
+    // Get here twice - once for listener, once for connector
+    void on_connection_open(proton::connection &c) override {
+        c.close();
+    }
+
+    void on_connection_close(proton::connection &c) override {
+        // Cross-thread cancelling
+
+        ASSERT(w4_state==0);
+        // Cancel the fourth task before its execution.
+        c.container().cancel(w4_handle);
+
+        ASSERT(w5_state==1);
+        // Cancel the already executed fifth task.
+        c.container().cancel(w5_handle);
+
+        c.container().stop();
+    }
+
+    void on_transport_error(proton::transport & t) override {
+        // Do nothing - ignore transport errors - we're going to get one when
+        // the container stops.
+    }
+
+public:
+    schedule_cancel(): w1_state(0), w2_state(0), w3_state(0), w4_state(0), 
w5_state(0) {}
+
+    int w1_state;
+    int w2_state;
+    int w3_state;
+    int w4_state;
+    int w5_state;
+};
+
+int test_container_schedule_cancel() {
+    schedule_cancel t;
+    proton::container(t).run(2);
+    ASSERT(t.w1_state==0); // The value of w1_state remained 0 because we 
cancelled the associated task before its execution.
+    ASSERT(t.w2_state==0); // The value of w2_state remained 0 because we 
cancelled the associated task before its execution.
+    ASSERT(t.w3_state==1); // The value of w3_state changed to 1 because we 
hadn't cancelled this task.
+    ASSERT(t.w4_state==0); // The value of w4_state remained 0 because we 
cancelled the associated task before its execution.
+    ASSERT(t.w5_state==1); // The value of w5_state changed to 1 because the 
task was already executed before we cancelled it.
+    return 0;
+}
+
 } // namespace
 
 int main(int argc, char** argv) {
@@ -595,5 +705,6 @@ int main(int argc, char** argv) {
     RUN_ARGV_TEST(failed, test_container_mt_stop_empty());
     RUN_ARGV_TEST(failed, test_container_mt_stop());
     RUN_ARGV_TEST(failed, test_container_mt_close_race());
+    RUN_ARGV_TEST(failed, test_container_schedule_cancel());
     return failed;
 }
diff --git a/cpp/src/proactor_container_impl.cpp 
b/cpp/src/proactor_container_impl.cpp
index 088e3009f..e965be730 100644
--- a/cpp/src/proactor_container_impl.cpp
+++ b/cpp/src/proactor_container_impl.cpp
@@ -452,12 +452,13 @@ proton::listener container::impl::listen(const 
std::string& addr, proton::listen
     return proton::listener(listener);
 }
 
-void container::impl::schedule(duration delay, work f) {
+work_handle container::impl::schedule(duration delay, work f) {
     GUARD(deferred_lock_);
     timestamp now = timestamp::now();
 
     // Record timeout; Add callback to timeout sorted list
-    scheduled s = {now+delay, f};
+    scheduled s = {now+delay, f, current_work_handle_};
+    ++current_work_handle_;
     deferred_.push_back(s);
     std::push_heap(deferred_.begin(), deferred_.end());
 
@@ -465,6 +466,13 @@ void container::impl::schedule(duration delay, work f) {
     scheduled* next = &deferred_.front();
     pn_millis_t timeout_ms = (now < next->time) ? 
(next->time-now).milliseconds() : 0;
     pn_proactor_set_timeout(proactor_, timeout_ms);
+    is_active_.insert(s.w_handle);
+    return s.w_handle;
+}
+
+void container::impl::cancel(work_handle work_handle) {
+    GUARD(deferred_lock_);
+    is_active_.erase(work_handle);
 }
 
 void container::impl::client_connection_options(const connection_options 
&opts) {
@@ -532,7 +540,13 @@ void container::impl::run_timer_jobs() {
     // We've now taken the tasks to run from the deferred tasks
     // so we can run them unlocked
     // NB. We copied the due tasks in reverse order so execute from end
-    for (int i = tasks.size()-1; i>=0; --i) tasks[i].task();
+
+    for (int i = tasks.size()-1; i>=0; --i) {
+        if(is_active_.count(tasks[i].w_handle)) {
+            tasks[i].task();
+            is_active_.erase(tasks[i].w_handle);
+        }
+    }
 }
 
 // Return true if this thread is finished
diff --git a/cpp/src/proactor_container_impl.hpp 
b/cpp/src/proactor_container_impl.hpp
index f3475b0d4..1f87e7636 100644
--- a/cpp/src/proactor_container_impl.hpp
+++ b/cpp/src/proactor_container_impl.hpp
@@ -42,6 +42,7 @@
 #include <set>
 #include <string>
 #include <vector>
+#include <unordered_set>
 
 #include <mutex>
 # define MUTEX(x) std::mutex x;
@@ -84,7 +85,8 @@ class container::impl {
     void run(int threads);
     void stop(const error_condition& err);
     void auto_stop(bool set);
-    void schedule(duration, work);
+    work_handle schedule(duration, work);
+    void cancel(work_handle);
     template <class T> static void set_handler(T s, messaging_handler* h);
     template <class T> static messaging_handler* get_handler(T s);
     messaging_handler* get_handler(pn_event_t *event);
@@ -125,10 +127,13 @@ class container::impl {
     struct scheduled {
         timestamp time; // duration from epoch for task
         work task;
+        work_handle w_handle;
 
         // We want to get to get the *earliest* first so test is "reversed"
         bool operator < (const scheduled& r) const { return  r.time < time; }
     };
+    work_handle current_work_handle_ = 0;
+    std::unordered_set<work_handle> is_active_; // Stores the active 
work_handles
     std::vector<scheduled> deferred_; // This vector is kept as a heap
     MUTEX(deferred_lock_)
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to