Hi Bozzo, Can you please revert this change?
It is causing a segfault in the python unit tests when they are run under python3.4. I haven't hit the segfault on python2.7, only on python3.4 thanks, -K ----- Original Message ----- > From: [email protected] > To: [email protected] > Sent: Tuesday, July 7, 2015 3:50:16 PM > Subject: [2/2] qpid-proton git commit: PROTON-928: cancellable tasks > > PROTON-928: cancellable tasks > > A scheduled task can be cancelled. > A cancelled task does not prevent reactor from stopping running > > > Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo > Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/d4d22ee3 > Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/d4d22ee3 > Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/d4d22ee3 > > Branch: refs/heads/master > Commit: d4d22ee396163babcac19c48845b1f10ca3b5a48 > Parents: 09af375 > Author: Bozo Dragojevic <[email protected]> > Authored: Tue Jul 7 10:17:40 2015 +0200 > Committer: Bozo Dragojevic <[email protected]> > Committed: Tue Jul 7 21:49:44 2015 +0200 > > ---------------------------------------------------------------------- > proton-c/bindings/python/proton/reactor.py | 5 +++- > proton-c/include/proton/reactor.h | 1 + > proton-c/src/reactor/timer.c | 25 +++++++++++++++++++- > proton-c/src/tests/reactor.c | 15 ++++++++++++ > .../org/apache/qpid/proton/reactor/Task.java | 4 ++++ > .../qpid/proton/reactor/impl/TaskImpl.java | 10 ++++++++ > .../apache/qpid/proton/reactor/impl/Timer.java | 19 ++++++++++++--- > proton-j/src/main/resources/creactor.py | 3 +++ > tests/python/proton_tests/reactor.py | 14 +++++++++++ > 9 files changed, 91 insertions(+), 5 deletions(-) > ---------------------------------------------------------------------- > > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/bindings/python/proton/reactor.py > ---------------------------------------------------------------------- > diff --git a/proton-c/bindings/python/proton/reactor.py > b/proton-c/bindings/python/proton/reactor.py > index c66334b..d019554 100644 > --- a/proton-c/bindings/python/proton/reactor.py > +++ b/proton-c/bindings/python/proton/reactor.py > @@ -53,6 +53,9 @@ class Task(Wrapper): > def _init(self): > pass > > + def cancel(self): > + pn_task_cancel(self._impl) > + > class Acceptor(Wrapper): > > def __init__(self, impl): > @@ -112,7 +115,7 @@ class Reactor(Wrapper): > pn_reactor_yield(self._impl) > > def mark(self): > - pn_reactor_mark(self._impl) > + return pn_reactor_mark(self._impl) > > def _get_handler(self): > return WrappedHandler.wrap(pn_reactor_get_handler(self._impl), > self.on_error) > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/include/proton/reactor.h > ---------------------------------------------------------------------- > diff --git a/proton-c/include/proton/reactor.h > b/proton-c/include/proton/reactor.h > index 59b2282..6f52d22 100644 > --- a/proton-c/include/proton/reactor.h > +++ b/proton-c/include/proton/reactor.h > @@ -96,6 +96,7 @@ PN_EXTERN pn_task_t *pn_timer_schedule(pn_timer_t *timer, > pn_timestamp_t deadlin > PN_EXTERN int pn_timer_tasks(pn_timer_t *timer); > > PN_EXTERN pn_record_t *pn_task_attachments(pn_task_t *task); > +PN_EXTERN void pn_task_cancel(pn_task_t *task); > > PN_EXTERN pn_reactor_t *pn_class_reactor(const pn_class_t *clazz, void > *object); > PN_EXTERN pn_reactor_t *pn_object_reactor(void *object); > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/reactor/timer.c > ---------------------------------------------------------------------- > diff --git a/proton-c/src/reactor/timer.c b/proton-c/src/reactor/timer.c > index 1ad0821..61efd31 100644 > --- a/proton-c/src/reactor/timer.c > +++ b/proton-c/src/reactor/timer.c > @@ -27,12 +27,14 @@ struct pn_task_t { > pn_list_t *pool; > pn_record_t *attachments; > pn_timestamp_t deadline; > + bool cancelled; > }; > > void pn_task_initialize(pn_task_t *task) { > task->pool = NULL; > task->attachments = pn_record(); > task->deadline = 0; > + task->cancelled = false; > } > > void pn_task_finalize(pn_task_t *task) { > @@ -68,6 +70,11 @@ pn_record_t *pn_task_attachments(pn_task_t *task) { > return task->attachments; > } > > +void pn_task_cancel(pn_task_t *task) { > + assert(task); > + task->cancelled = true; > +} > + > // > // timer > // > @@ -113,8 +120,22 @@ pn_task_t *pn_timer_schedule(pn_timer_t *timer, > pn_timestamp_t deadline) { > return task; > } > > +void pni_timer_flush_cancelled(pn_timer_t *timer) { > + while (pn_list_size(timer->tasks)) { > + pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); > + if (task->cancelled) { > + pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); > + assert(min == task); > + pn_decref(min); > + } else { > + break; > + } > + } > +} > + > pn_timestamp_t pn_timer_deadline(pn_timer_t *timer) { > assert(timer); > + pni_timer_flush_cancelled(timer); > if (pn_list_size(timer->tasks)) { > pn_task_t *task = (pn_task_t *) pn_list_get(timer->tasks, 0); > return task->deadline; > @@ -130,7 +151,8 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) > { > if (now >= task->deadline) { > pn_task_t *min = (pn_task_t *) pn_list_minpop(timer->tasks); > assert(min == task); > - pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK); > + if (!min->cancelled) > + pn_collector_put(timer->collector, PN_OBJECT, min, PN_TIMER_TASK); > pn_decref(min); > } else { > break; > @@ -140,5 +162,6 @@ void pn_timer_tick(pn_timer_t *timer, pn_timestamp_t now) > { > > int pn_timer_tasks(pn_timer_t *timer) { > assert(timer); > + pni_timer_flush_cancelled(timer); > return pn_list_size(timer->tasks); > } > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-c/src/tests/reactor.c > ---------------------------------------------------------------------- > diff --git a/proton-c/src/tests/reactor.c b/proton-c/src/tests/reactor.c > index fe6c769..059d099 100644 > --- a/proton-c/src/tests/reactor.c > +++ b/proton-c/src/tests/reactor.c > @@ -440,6 +440,20 @@ static void test_reactor_schedule_handler(void) { > pn_free(tevents); > } > > +static void test_reactor_schedule_cancel(void) { > + pn_reactor_t *reactor = pn_reactor(); > + pn_handler_t *root = pn_reactor_get_handler(reactor); > + pn_list_t *events = pn_list(PN_VOID, 0); > + pn_handler_add(root, test_handler(reactor, events)); > + pn_task_t *task = pn_reactor_schedule(reactor, 0, NULL); > + pn_task_cancel(task); > + pn_reactor_run(reactor); > + pn_reactor_free(reactor); > + expect(events, PN_REACTOR_INIT, PN_SELECTABLE_INIT, PN_SELECTABLE_UPDATED, > + PN_SELECTABLE_FINAL, PN_REACTOR_FINAL, END); > + pn_free(events); > +} > + > int main(int argc, char **argv) > { > test_reactor(); > @@ -461,5 +475,6 @@ int main(int argc, char **argv) > test_reactor_transfer(4*1024, 1024); > test_reactor_schedule(); > test_reactor_schedule_handler(); > + test_reactor_schedule_cancel(); > return 0; > } > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java > ---------------------------------------------------------------------- > diff --git a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java > b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java > index 69701ab..7fb5964 100644 > --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java > +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/Task.java > @@ -43,4 +43,8 @@ public interface Task extends Extendable { > /** @return the reactor that created this task. */ > Reactor getReactor(); > > + /** > + * Cancel the execution of this task. No-op if invoked after the task > was already executed. > + */ > + void cancel(); > } > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java > ---------------------------------------------------------------------- > diff --git > a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java > b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java > index 00c9a84..11bb6b8 100644 > --- > a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java > +++ > b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/TaskImpl.java > @@ -31,6 +31,7 @@ import org.apache.qpid.proton.reactor.Task; > public class TaskImpl implements Task, Comparable<TaskImpl> { > private final long deadline; > private final int counter; > + private boolean cancelled = false; > private final AtomicInteger count = new AtomicInteger(); > private Record attachments = new RecordImpl(); > private Reactor reactor; > @@ -58,6 +59,15 @@ public class TaskImpl implements Task, > Comparable<TaskImpl> { > return deadline; > } > > + public boolean isCancelled() { > + return cancelled; > + } > + > + @Override > + public void cancel() { > + cancelled = true; > + } > + > public void setReactor(Reactor reactor) { > this.reactor = reactor; > } > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java > ---------------------------------------------------------------------- > diff --git > a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java > b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java > index 32bb4f6..b8df19d 100644 > --- a/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java > +++ b/proton-j/src/main/java/org/apache/qpid/proton/reactor/impl/Timer.java > @@ -31,7 +31,7 @@ import org.apache.qpid.proton.reactor.Task; > public class Timer { > > private CollectorImpl collector; > - private PriorityQueue<Task> tasks = new PriorityQueue<Task>(); > + private PriorityQueue<TaskImpl> tasks = new PriorityQueue<TaskImpl>(); > > public Timer(Collector collector) { > this.collector = (CollectorImpl)collector; > @@ -44,6 +44,7 @@ public class Timer { > } > > long deadline() { > + flushCancelled(); > if (tasks.size() > 0) { > Task task = tasks.peek(); > return task.deadline(); > @@ -52,12 +53,23 @@ public class Timer { > } > } > > + private void flushCancelled() { > + while (!tasks.isEmpty()) { > + TaskImpl task = tasks.peek(); > + if (task.isCancelled()) > + tasks.poll(); > + else > + break; > + } > + } > + > void tick(long now) { > while(!tasks.isEmpty()) { > - Task task = tasks.peek(); > + TaskImpl task = tasks.peek(); > if (now >= task.deadline()) { > tasks.poll(); > - collector.put(Type.TIMER_TASK, task); > + if (!task.isCancelled()) > + collector.put(Type.TIMER_TASK, task); > } else { > break; > } > @@ -65,6 +77,7 @@ public class Timer { > } > > int tasks() { > + flushCancelled(); > return tasks.size(); > } > } > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/proton-j/src/main/resources/creactor.py > ---------------------------------------------------------------------- > diff --git a/proton-j/src/main/resources/creactor.py > b/proton-j/src/main/resources/creactor.py > index e179b23..1f8514e 100644 > --- a/proton-j/src/main/resources/creactor.py > +++ b/proton-j/src/main/resources/creactor.py > @@ -78,6 +78,9 @@ def pn_selectable_set_fd(s, fd): > def pn_acceptor_close(a): > a.close() > > +def pn_task_cancel(t): > + t.cancel() > + > def pn_object_reactor(o): > if hasattr(o, "impl"): > if hasattr(o.impl, "getSession"): > > http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/d4d22ee3/tests/python/proton_tests/reactor.py > ---------------------------------------------------------------------- > diff --git a/tests/python/proton_tests/reactor.py > b/tests/python/proton_tests/reactor.py > index 6afee30..067c5c0 100644 > --- a/tests/python/proton_tests/reactor.py > +++ b/tests/python/proton_tests/reactor.py > @@ -171,3 +171,17 @@ class ExceptionTest(Test): > assert False, "expected to barf" > except Barf: > pass > + > + def test_schedule_cancel(self): > + barf = self.reactor.schedule(10, BarfOnTask()) > + class CancelBarf: > + def on_timer_task(self, event): > + barf.cancel() > + self.reactor.schedule(0, CancelBarf()) > + now = self.reactor.mark() > + try: > + self.reactor.run() > + elapsed = self.reactor.mark() - now > + assert elapsed < 10, "expected cancelled task to not delay the > reactor by " + elapsed > + except Barf: > + assert False, "expected barf to be cancelled" > > > --------------------------------------------------------------------- > To unsubscribe, e-mail: [email protected] > For additional commands, e-mail: [email protected] > > -- -K --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
