PROTON-928: cancellable tasks A scheduled task can be cancelled. A cancelled task does not prevent reactor from stopping running
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3 Branch: refs/heads/master Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48 Parents: 09af375 Author: Bozo Dragojevic <[email protected]> Authored: Tue Jul 7 10:17:40 2015 +0200 Committer: Bozo Dragojevic <[email protected]> Committed: Tue Jul 7 21:49:44 2015 +0200 ---------------------------------------------------------------------- proton-c/bindings/python/proton/reactor.py | 5 +++- proton-c/include/proton/reactor.h | 1 + proton-c/src/reactor/timer.c | 25 +++++++++++++++++++- proton-c/src/tests/reactor.c | 15 ++++++++++++ .../org/apache/qpid/proton/reactor/Task.java | 4 ++++ .../qpid/proton/reactor/impl/TaskImpl.java | 10 ++++++++ .../apache/qpid/proton/reactor/impl/Timer.java | 19 ++++++++++++--- proton-j/src/main/resources/creactor.py | 3 +++ tests/python/proton_tests/reactor.py | 14 +++++++++++ 9 files changed, 91 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py ---------------------------------------------------------------------- diff --git a/proton-c/bindings/python/proton/reactor.py b/proton-c/bindings/python/proton/reactor.py index c66334b..d019554 100644 --- a/proton-c/bindings/python/proton/reactor.py +++ b/proton-c/bindings/python/proton/reactor.py @@ -53,6 +53,9 @@ class Task(Wrapper): def _init(self): pass + def cancel(self): + pn_task_cancel(self._impl) + class Acceptor(Wrapper): def __init__(self, impl): @@ -112,7 +115,7 @@ class Reactor(Wrapper): pn_reactor_yield(self._impl) def mark(self): - pn_reactor_mark(self._impl) + return pn_reactor_mark(self._impl) def _get_handler(self): return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), self.on_error) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h ---------------------------------------------------------------------- diff --git a/proton-c/include/proton/reactor.h b/proton-c/include/proton/reactor.h index 59b2282..6f52d22 100644 --- a/proton-c/include/proton/reactor.h +++ b/proton-c/include/proton/reactor.h @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer, pn_timestamp_t deadlin PN_EXTERN int pn_timer_tasks(pn_timer_t *timer); PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task); +PN_EXTERN void pn_task_cancel(pn_task_t *task); PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void *object); PN_EXTERN pn_reactor_t *pn_object_reactor(void *object); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c ---------------------------------------------------------------------- diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c index 1ad0821..61efd31 100644 --- a/proton-c/src/reactor/timer.c +++ b/proton-c/src/reactor/timer.c @@ -27,12 +27,14 @@ struct pn_task_t { pn_list_t *pool; pn_record_t *attachments; pn_timestamp_t deadline; + bool cancelled; }; void pn_task_initialize(pn_task_t *task) { task->pool = NULL; task->attachments = pn_record(); task->deadline = 0; + task->cancelled = false; } void pn_task_finalize(pn_task_t *task) { @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) { return task->attachments; } +void pn_task_cancel(pn_task_t *task) { + assert(task); + task->cancelled = true; +} + // // timer // @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer, pn_timestamp_t deadline) { return task; } +void pni_timer_flush_cancelled(pn_timer_t *timer) { + while (pn_list_size(timer->tasks)) { + pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); + if (task->cancelled) { + pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); + assert(min == task); + pn_decref(min); + } else { + break; + } + } +} + pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) { assert(timer); + pni_timer_flush_cancelled(timer); if (pn_list_size(timer->tasks)) { pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); return task->deadline; @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) { if (now >= task->deadline) { pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); assert(min == task); - pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK); + if (!min->cancelled) + pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK); pn_decref(min); } else { break; @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) { int pn_timer_tasks(pn_timer_t *timer) { assert(timer); + pni_timer_flush_cancelled(timer); return pn_list_size(timer->tasks); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c ---------------------------------------------------------------------- diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c index fe6c769..059d099 100644 --- a/proton-c/src/tests/reactor.c +++ b/proton-c/src/tests/reactor.c @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) { pn_free(tevents); } +static void test_reactor_schedule_cancel(void) { + pn_reactor_t *reactor = pn_reactor(); + pn_handler_t *root = pn_reactor_get_handler(reactor); + pn_list_t *events = pn_list(PN_VOID, 0); + pn_handler_add(root, test_handler(reactor, events)); + pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL); + pn_task_cancel(task); + pn_reactor_run(reactor); + pn_reactor_free(reactor); + expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, + PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); + pn_free(events); +} + int main(int argc, char **argv) { test_reactor(); @@ -461,5 +475,6 @@ int main(int argc, char **argv) test_reactor_transfer(4*1024, 1024); test_reactor_schedule(); test_reactor_schedule_handler(); + test_reactor_schedule_cancel(); return 0; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java index 69701ab..7fb5964 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java @@ -43,4 +43,8 @@ public interface Task extends Extendable { /** @return the reactor that created this task. */ Reactor getReactor(); + /** + * Cancel the execution of this task. No-op if invoked after the task was already executed. + */ + void cancel(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java index 00c9a84..11bb6b8 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task; public class TaskImpl implements Task, Comparable<TaskImpl> { private final long deadline; private final int counter; + private boolean cancelled = false; private final AtomicInteger count = new AtomicInteger(); private Record attachments = new RecordImpl(); private Reactor reactor; @@ -58,6 +59,15 @@ public class TaskImpl implements Task, Comparable<TaskImpl> { return deadline; } + public boolean isCancelled() { + return cancelled; + } + + @Override + public void cancel() { + cancelled = true; + } + public void setReactor(Reactor reactor) { this.reactor = reactor; } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java index 32bb4f6..b8df19d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task; public class Timer { private CollectorImpl collector; - private PriorityQueue<Task> tasks = new PriorityQueue<Task>(); + private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>(); public Timer(Collector collector) { this.collector = (CollectorImpl)collector; @@ -44,6 +44,7 @@ public class Timer { } long deadline() { + flushCancelled(); if (tasks.size() > 0) { Task task = tasks.peek(); return task.deadline(); @@ -52,12 +53,23 @@ public class Timer { } } + private void flushCancelled() { + while (!tasks.isEmpty()) { + TaskImpl task = tasks.peek(); + if (task.isCancelled()) + tasks.poll(); + else + break; + } + } + void tick(long now) { while(!tasks.isEmpty()) { - Task task = tasks.peek(); + TaskImpl task = tasks.peek(); if (now >= task.deadline()) { tasks.poll(); - collector.put(Type.TIMER_TASK, task); + if (!task.isCancelled()) + collector.put(Type.TIMER_TASK, task); } else { break; } @@ -65,6 +77,7 @@ public class Timer { } int tasks() { + flushCancelled(); return tasks.size(); } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/creactor.py b/proton-j/src/main/resources/creactor.py index e179b23..1f8514e 100644 --- a/proton-j/src/main/resources/creactor.py +++ b/proton-j/src/main/resources/creactor.py @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd): def pn_acceptor_close(a): a.close() +def pn_task_cancel(t): + t.cancel() + def pn_object_reactor(o): if hasattr(o, "impl"): if hasattr(o.impl, "getSession"): http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/reactor.py b/tests/python/proton_tests/reactor.py index 6afee30..067c5c0 100644 --- a/tests/python/proton_tests/reactor.py +++ b/tests/python/proton_tests/reactor.py @@ -171,3 +171,17 @@ class ExceptionTest(Test): assert False, "expected to barf" except Barf: pass + + def test_schedule_cancel(self): + barf = self.reactor.schedule(10, BarfOnTask()) + class CancelBarf: + def on_timer_task(self, event): + barf.cancel() + self.reactor.schedule(0, CancelBarf()) + now = self.reactor.mark() + try: + self.reactor.run() + elapsed = self.reactor.mark() - now + assert elapsed < 10, "expected cancelled task to not delay the reactor by " + elapsed + except Barf: + assert False, "expected barf to be cancelled" --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
