Updated Branches: refs/heads/master e15c50c91 -> ed5f024d4
ODE-942: fixing a scheduler bug. Thanks Waruna for the patch. Project: http://git-wip-us.apache.org/repos/asf/ode/repo Commit: http://git-wip-us.apache.org/repos/asf/ode/commit/ed5f024d Tree: http://git-wip-us.apache.org/repos/asf/ode/tree/ed5f024d Diff: http://git-wip-us.apache.org/repos/asf/ode/diff/ed5f024d Branch: refs/heads/master Commit: ed5f024d4ba697b559476bd31589a2b604e3cce7 Parents: e15c50c Author: Tammo van Lessen <[email protected]> Authored: Sun Sep 15 22:22:42 2013 +0200 Committer: Tammo van Lessen <[email protected]> Committed: Sun Sep 15 22:23:42 2013 +0200 ---------------------------------------------------------------------- .../ode/scheduler/simple/SimpleScheduler.java | 45 ++++++++++++++++++-- 1 file changed, 41 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/ode/blob/ed5f024d/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java ---------------------------------------------------------------------- diff --git a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java index a56b86e..e10a84d 100644 --- a/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java +++ b/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java @@ -379,12 +379,12 @@ public class SimpleScheduler implements Scheduler, TaskRunner { boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval; try { if (immediate) { - // Immediate scheduling means we put it in the DB for safe keeping - _db.insertJob(job, _nodeId, true); - + // Immediate scheduling means we put it in the DB for safe keeping. // And add it to our todo list . if (_outstandingJobs.size() < _todoLimit) { - addTodoOnCommit(job); + saveAndAddTodoOnCommit(job, _nodeId, true); + } else { + saveTodoOnCommit(job, _nodeId, true); } if (__log.isDebugEnabled()) { __log.debug("scheduled immediate job: " + job.jobId); @@ -644,6 +644,43 @@ public class SimpleScheduler implements Scheduler, TaskRunner { _exec.submit(new RunJob(job, _polledRunnableProcessor)); } + private void saveAndAddTodoOnCommit(final Job job, final String nodeId, final boolean loaded) { + registerSynchronizer(new Synchronizer() { + public void afterCompletion(boolean success) { + if (success) { + try { + _db.insertJob(job, nodeId, loaded); + } catch (DatabaseException dbe) { + __log.error("Could not save job on commit. Will add it to in-mem queue anyway.", dbe); + throw new ContextException(dbe.getMessage(), dbe); + } + enqueue(job); + } + } + + public void beforeCompletion() { + } + }); + } + + private void saveTodoOnCommit(final Job job, final String nodeId, final boolean loaded) { + registerSynchronizer(new Synchronizer() { + public void afterCompletion(boolean success) { + if (success) { + try { + _db.insertJob(job, nodeId, loaded); + } catch (DatabaseException dbe) { + __log.error("Could not save job on commit.", dbe); + throw new ContextException(dbe.getMessage(), dbe); + } + } + } + + public void beforeCompletion() { + } + }); + } + private void addTodoOnCommit(final Job job) { registerSynchronizer(new Synchronizer() { public void afterCompletion(boolean success) {
