This is an automated email from the ASF dual-hosted git repository.
astitcher pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/qpid-proton.git
The following commit(s) were added to refs/heads/main by this push:
new 9a89ff4ea PROTON-2438: Add cancellable tasks feature and tests
9a89ff4ea is described below
commit 9a89ff4eadee92ce014616492675b1d7195db4e2
Author: Rakhi Kumari <[email protected]>
AuthorDate: Mon Aug 29 10:14:58 2022 +0530
PROTON-2438: Add cancellable tasks feature and tests
---
cpp/include/proton/container.hpp | 11 +++-
cpp/src/container.cpp | 8 ++-
cpp/src/container_test.cpp | 111 ++++++++++++++++++++++++++++++++++++
cpp/src/proactor_container_impl.cpp | 20 ++++++-
cpp/src/proactor_container_impl.hpp | 7 ++-
5 files changed, 147 insertions(+), 10 deletions(-)
diff --git a/cpp/include/proton/container.hpp b/cpp/include/proton/container.hpp
index f5a4dd655..839cf8137 100644
--- a/cpp/include/proton/container.hpp
+++ b/cpp/include/proton/container.hpp
@@ -35,6 +35,7 @@
/// @copybrief proton::container
namespace proton {
+typedef uint64_t work_handle;
/// A top-level container of connections, sessions, and links.
///
@@ -303,15 +304,19 @@ class PN_CPP_CLASS_EXTERN container {
///
/// **C++ versions** - With C++11 and later, use a
/// `std::function<void()>` type for the `fn` parameter.
- PN_CPP_EXTERN void schedule(duration dur, work fn);
+ PN_CPP_EXTERN work_handle schedule(duration dur, work fn);
/// **Deprecated** - Use `container::schedule(duration, work)`.
- PN_CPP_EXTERN PN_CPP_DEPRECATED("Use 'container::schedule(duration,
work)'") void schedule(duration dur, void_function0& fn);
+ PN_CPP_EXTERN PN_CPP_DEPRECATED("Use 'container::schedule(duration,
work)'") work_handle schedule(duration dur, void_function0& fn);
+
+ /// Cancel task for the given work_handle.
+ PN_CPP_EXTERN void cancel(work_handle);
private:
/// Declare both v03 and v11 if compiling with c++11 as the library
contains both.
/// A C++11 user should never call the v03 overload so it is private in
this case
- PN_CPP_EXTERN void schedule(duration dur, internal::v03::work fn);
+ PN_CPP_EXTERN work_handle schedule(duration dur, internal::v03::work fn);
+
class impl;
std::unique_ptr<impl> impl_;
diff --git a/cpp/src/container.cpp b/cpp/src/container.cpp
index 07972aa11..8189539de 100644
--- a/cpp/src/container.cpp
+++ b/cpp/src/container.cpp
@@ -113,10 +113,12 @@ returned<receiver> container::open_receiver(
std::string container::id() const { return impl_->id(); }
-void container::schedule(duration d, internal::v03::work f) { return
impl_->schedule(d, f); }
-void container::schedule(duration d, internal::v11::work f) { return
impl_->schedule(d, f); }
+work_handle container::schedule(duration d, internal::v03::work f) { return
impl_->schedule(d, f); }
+work_handle container::schedule(duration d, internal::v11::work f) { return
impl_->schedule(d, f); }
-void container::schedule(duration d, void_function0& f) { return
impl_->schedule(d, make_work(&void_function0::operator(), &f)); }
+work_handle container::schedule(duration d, void_function0& f) { return
impl_->schedule(d, make_work(&void_function0::operator(), &f)); }
+
+void container::cancel(work_handle work_handle) { impl_->cancel(work_handle); }
void container::client_connection_options(const connection_options& c) {
impl_->client_connection_options(c); }
connection_options container::client_connection_options() const { return
impl_->client_connection_options(); }
diff --git a/cpp/src/container_test.cpp b/cpp/src/container_test.cpp
index d49aad9f5..b414bdf67 100644
--- a/cpp/src/container_test.cpp
+++ b/cpp/src/container_test.cpp
@@ -574,6 +574,116 @@ void test_container_mt_close_race() {
}
}
+class schedule_cancel : public proton::messaging_handler {
+ proton::listener listener;
+ test_listen_handler listen_handler;
+ long long w1_handle;
+ long long w2_handle;
+ long long w3_handle;
+ long long w4_handle;
+ long long w5_handle;
+
+ void change_w1_state(proton::container* c) {
+ w1_state = 1;
+ }
+
+ void change_w2_state(proton::container* c) {
+ w2_state = 1;
+ }
+
+ void change_w3_state(proton::container* c) {
+ w3_state = 1;
+ }
+
+ void change_w4_state(proton::container* c) {
+ w4_state = 1;
+ }
+
+ void change_w5_state(proton::container* c) {
+ w5_state = 1;
+ }
+
+ void on_container_start(proton::container& c) override {
+ ASSERT(w1_state==0);
+ ASSERT(w2_state==0);
+ ASSERT(w3_state==0);
+ ASSERT(w4_state==0);
+ ASSERT(w5_state==0);
+
+ listener = c.listen("//:0", listen_handler);
+
+ // We will cancel this scheduled task before its execution.
+ w1_handle = c.schedule(proton::duration(250),
proton::make_work(&schedule_cancel::change_w1_state, this, &c));
+
+ // We will cancel this scheduled task before its execution and will
try to cancel it again.
+ w2_handle = c.schedule(proton::duration(260),
proton::make_work(&schedule_cancel::change_w2_state, this, &c));
+
+ // We will not cancel this scheduled task.
+ w3_handle = c.schedule(proton::duration(35),
proton::make_work(&schedule_cancel::change_w3_state, this, &c));
+
+ // We will try to cancel this task before its execution from different
thread i.e connection thread.
+ w4_handle = c.schedule(proton::duration(270),
proton::make_work(&schedule_cancel::change_w4_state, this, &c));
+
+ // We will try to cancel this task after its execution from different
thread i.e. connection thread.
+ w5_handle = c.schedule(proton::duration(0),
proton::make_work(&schedule_cancel::change_w5_state, this, &c));
+
+ // Cancel the first scheduled task.
+ c.cancel(w1_handle);
+
+ // Try cancelling the second scheduled task two times.
+ c.cancel(w2_handle);
+ c.cancel(w2_handle);
+
+ // Try cancelling invalid work handle.
+ c.cancel(-1);
+ c.auto_stop(false);
+ }
+
+ // Get here twice - once for listener, once for connector
+ void on_connection_open(proton::connection &c) override {
+ c.close();
+ }
+
+ void on_connection_close(proton::connection &c) override {
+ // Cross-thread cancelling
+
+ ASSERT(w4_state==0);
+ // Cancel the fourth task before its execution.
+ c.container().cancel(w4_handle);
+
+ ASSERT(w5_state==1);
+ // Cancel the already executed fifth task.
+ c.container().cancel(w5_handle);
+
+ c.container().stop();
+ }
+
+ void on_transport_error(proton::transport & t) override {
+ // Do nothing - ignore transport errors - we're going to get one when
+ // the container stops.
+ }
+
+public:
+ schedule_cancel(): w1_state(0), w2_state(0), w3_state(0), w4_state(0),
w5_state(0) {}
+
+ int w1_state;
+ int w2_state;
+ int w3_state;
+ int w4_state;
+ int w5_state;
+};
+
+int test_container_schedule_cancel() {
+ schedule_cancel t;
+ proton::container(t).run(2);
+ ASSERT(t.w1_state==0); // The value of w1_state remained 0 because we
cancelled the associated task before its execution.
+ ASSERT(t.w2_state==0); // The value of w2_state remained 0 because we
cancelled the associated task before its execution.
+ ASSERT(t.w3_state==1); // The value of w3_state changed to 1 because we
hadn't cancelled this task.
+ ASSERT(t.w4_state==0); // The value of w4_state remained 0 because we
cancelled the associated task before its execution.
+ ASSERT(t.w5_state==1); // The value of w5_state changed to 1 because the
task was already executed before we cancelled it.
+ return 0;
+}
+
} // namespace
int main(int argc, char** argv) {
@@ -595,5 +705,6 @@ int main(int argc, char** argv) {
RUN_ARGV_TEST(failed, test_container_mt_stop_empty());
RUN_ARGV_TEST(failed, test_container_mt_stop());
RUN_ARGV_TEST(failed, test_container_mt_close_race());
+ RUN_ARGV_TEST(failed, test_container_schedule_cancel());
return failed;
}
diff --git a/cpp/src/proactor_container_impl.cpp
b/cpp/src/proactor_container_impl.cpp
index 088e3009f..e965be730 100644
--- a/cpp/src/proactor_container_impl.cpp
+++ b/cpp/src/proactor_container_impl.cpp
@@ -452,12 +452,13 @@ proton::listener container::impl::listen(const
std::string& addr, proton::listen
return proton::listener(listener);
}
-void container::impl::schedule(duration delay, work f) {
+work_handle container::impl::schedule(duration delay, work f) {
GUARD(deferred_lock_);
timestamp now = timestamp::now();
// Record timeout; Add callback to timeout sorted list
- scheduled s = {now+delay, f};
+ scheduled s = {now+delay, f, current_work_handle_};
+ ++current_work_handle_;
deferred_.push_back(s);
std::push_heap(deferred_.begin(), deferred_.end());
@@ -465,6 +466,13 @@ void container::impl::schedule(duration delay, work f) {
scheduled* next = &deferred_.front();
pn_millis_t timeout_ms = (now < next->time) ?
(next->time-now).milliseconds() : 0;
pn_proactor_set_timeout(proactor_, timeout_ms);
+ is_active_.insert(s.w_handle);
+ return s.w_handle;
+}
+
+void container::impl::cancel(work_handle work_handle) {
+ GUARD(deferred_lock_);
+ is_active_.erase(work_handle);
}
void container::impl::client_connection_options(const connection_options
&opts) {
@@ -532,7 +540,13 @@ void container::impl::run_timer_jobs() {
// We've now taken the tasks to run from the deferred tasks
// so we can run them unlocked
// NB. We copied the due tasks in reverse order so execute from end
- for (int i = tasks.size()-1; i>=0; --i) tasks[i].task();
+
+ for (int i = tasks.size()-1; i>=0; --i) {
+ if(is_active_.count(tasks[i].w_handle)) {
+ tasks[i].task();
+ is_active_.erase(tasks[i].w_handle);
+ }
+ }
}
// Return true if this thread is finished
diff --git a/cpp/src/proactor_container_impl.hpp
b/cpp/src/proactor_container_impl.hpp
index f3475b0d4..1f87e7636 100644
--- a/cpp/src/proactor_container_impl.hpp
+++ b/cpp/src/proactor_container_impl.hpp
@@ -42,6 +42,7 @@
#include <set>
#include <string>
#include <vector>
+#include <unordered_set>
#include <mutex>
# define MUTEX(x) std::mutex x;
@@ -84,7 +85,8 @@ class container::impl {
void run(int threads);
void stop(const error_condition& err);
void auto_stop(bool set);
- void schedule(duration, work);
+ work_handle schedule(duration, work);
+ void cancel(work_handle);
template <class T> static void set_handler(T s, messaging_handler* h);
template <class T> static messaging_handler* get_handler(T s);
messaging_handler* get_handler(pn_event_t *event);
@@ -125,10 +127,13 @@ class container::impl {
struct scheduled {
timestamp time; // duration from epoch for task
work task;
+ work_handle w_handle;
// We want to get to get the *earliest* first so test is "reversed"
bool operator < (const scheduled& r) const { return r.time < time; }
};
+ work_handle current_work_handle_ = 0;
+ std::unordered_set<work_handle> is_active_; // Stores the active
work_handles
std::vector<scheduled> deferred_; // This vector is kept as a heap
MUTEX(deferred_lock_)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]