Author: midon
Date: Sat Oct 10 06:28:56 2009
New Revision: 823804
URL: http://svn.apache.org/viewvc?rev=823804&view=rev
Log:
Clean up the code to avoid confusing duplication and apply the same
retry policy to all job types
Modified:
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified:
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=823804&r1=823803&r2=823804&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
(original)
+++
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Sat Oct 10 06:28:56 2009
@@ -419,80 +419,83 @@
_running = false;
}
- /**
- * Run a job in the current thread.
- *
- * @param job job to run.
- */
- protected void runJob(final Job job) {
- final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId,
job.detail,
- (Integer)(job.detail.get("retry") != null ?
job.detail.get("retry") : 0));
+ class RunJob implements Callable<Void> {
+ final Job job;
+ final JobProcessor processor;
- _exec.submit(new Callable<Void>() {
- public Void call() throws Exception {
- try {
- if (job.transacted) {
- final Object[] needRetry = new Object[] { false };
- try {
- execTransaction(new Callable<Void>() {
- public Void call() throws Exception {
- if (job.persisted)
- if (!_db.deleteJob(job.jobId, _nodeId))
- throw new
JobNoLongerInDbException(job.jobId,_nodeId);
- try {
- _jobProcessor.onScheduledJob(jobInfo);
- } catch (JobProcessorException jpe) {
- if (jpe.retry) {
- needRetry[0] = true;
- } else {
- __log.error("Error while
processing transaction, no retry.", jpe);
+ RunJob(Job job, JobProcessor processor) {
+ this.job = job;
+ this.processor = processor;
+ }
+
+ public Void call() throws Exception {
+ try {
+ final Scheduler.JobInfo jobInfo = new
Scheduler.JobInfo(job.jobId, job.detail,
+ (Integer) (job.detail.get("retry") != null ?
job.detail.get("retry") : 0));
+ if (job.transacted) {
+ final boolean[] needRetry = new boolean[]{false};
+ try {
+ execTransaction(new Callable<Void>() {
+ public Void call() throws Exception {
+ if (job.persisted)
+ if (!_db.deleteJob(job.jobId, _nodeId))
+ throw new
JobNoLongerInDbException(job.jobId, _nodeId);
+ try {
+ processor.onScheduledJob(jobInfo);
+ // If the job is a "runnable" job,
schedule the next job occurence
+ if (job.detail.get("runnable") != null &&
!"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status")))) {
+ // the runnable is still in progress,
schedule checker to 10 mins later
+ if (_pollIntervalForPolledRunnable <
0) {
+ if (__log.isWarnEnabled())
+ __log.warn("The poll interval
for polled runnables is negative; setting it to 1000ms");
+ _pollIntervalForPolledRunnable =
1000;
}
- // Let execTransaction know that shit
happened.
- throw jpe;
+ job.schedDate =
System.currentTimeMillis() + _pollIntervalForPolledRunnable;
+ _db.insertJob(job, _nodeId, false);
}
- return null;
- }
- });
- } catch (JobNoLongerInDbException jde) {
- // This may happen if two node try to do the same
job... we try to avoid
- // it the synchronization is a best-effort but not
perfect.
- __log.debug("job no longer in db forced
rollback.");
- } catch (final Exception ex) {
- // We only get here if the above execTransaction
fails, so that transaction got
- // rollbacked already
- execTransaction(new Callable<Void>() {
- public Void call() throws Exception {
- if ((Boolean)needRetry[0]) {
- int retry = job.detail.get("retry") !=
null ? (((Integer)job.detail.get("retry")) + 1) : 0;
- if (retry <= 10) {
- long delay = doRetry(job);
- __log.error("Error while
processing transaction, retrying in " + delay + "s");
- } else {
- __log.error("Error while
processing transaction after 10 retries, no more retries:"+job);
- }
+ } catch (JobProcessorException jpe) {
+ if (jpe.retry) {
+ needRetry[0] = true;
+ } else {
+ __log.error("Error while processing
transaction, no retry.", jpe);
}
-
- // We got rollbacked, as we schedule a
retry we want to be sure the original kob is
- // really deleted
- if (job.persisted)
_db.deleteJob(job.jobId, _nodeId);
-
- __log.error("Error while executing
transaction", ex);
- return null;
+ // Let execTransaction know that shit
happened.
+ throw jpe;
}
- });
- }
- } else {
- _jobProcessor.onScheduledJob(jobInfo);
+ return null;
+ }
+ });
+ } catch (JobNoLongerInDbException jde) {
+ // This may happen if two node try to do the same
job... we try to avoid
+ // it the synchronization is a best-effort but not
perfect.
+ __log.debug("job no longer in db forced rollback.");
+ } catch (final Exception ex) {
+ __log.error("Error while executing transaction", ex);
+
+ // We only get here if the above execTransaction
fails, so that transaction got
+ // rollbacked already
+ execTransaction(new Retry(job, needRetry[0]));
}
- return null;
- } finally {
- _outstandingJobs.remove(job.jobId);
+ } else {
+ processor.onScheduledJob(jobInfo);
}
+ return null;
+ } finally {
+ _outstandingJobs.remove(job.jobId);
}
- });
+ }
}
-
+
/**
+ * Run a job in the current thread.
+ *
+ * @param job job to run.
+ */
+ protected void runJob(final Job job) {
+ _exec.submit(new RunJob(job, _jobProcessor));
+ }
+
+ /**
* Run a job from a polled runnable thread. The runnable is not persistent,
* however, the poller is persistent and wakes up every given interval to
* check the status of the runnable.
@@ -507,7 +510,7 @@
* <li>5. System powered off and restarts; the poller job does not know
what the status
* of the runnable. This is handled just like the case #1.</li>
* </ul>
- *
+ * <p/>
* There is at least one re-scheduling of the poller job. Since, the
runnable's state is
* not persisted, and the same runnable may be tried again after system
failure,
* the runnable that's used with this polling should be repeatable.
@@ -515,58 +518,45 @@
* @param job job to run.
*/
protected void runPolledRunnable(final Job job) {
- final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId,
job.detail,
- (Integer)(job.detail.get("retry") != null ?
job.detail.get("retry") : 0));
+ _exec.submit(new RunJob(job, _polledRunnableProcessor));
+ }
- _exec.submit(new Callable<Void>() {
- public Void call() throws Exception {
- try {
- execTransaction(new Callable<Void>() {
- public Void call() throws Exception {
- if (!_db.deleteJob(job.jobId, _nodeId))
- throw new
JobNoLongerInDbException(job.jobId,_nodeId);
-
- try {
-
_polledRunnableProcessor.onScheduledJob(jobInfo);
- if(
!"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status"))) )
{
- // the runnable is still in progress,
schedule checker to 10 mins later
- if( _pollIntervalForPolledRunnable < 0 ) {
- if(__log.isWarnEnabled())
__log.warn("The poll interval for polled runnables is negative; setting it to
1000ms");
- _pollIntervalForPolledRunnable = 1000;
- }
- job.schedDate = System.currentTimeMillis()
+ _pollIntervalForPolledRunnable;
- _db.insertJob(job, _nodeId, false);
- }
- } catch (JobProcessorException jpe) {
- if (jpe.retry) {
- int retry = job.detail.get("retry") !=
null ? (((Integer)job.detail.get("retry")) + 1) : 0;
- if (retry <= 10) {
- long delay = doRetry(job);
- __log.error("Error while processing
transaction, retrying in " + delay + "s");
- } else {
- __log.error("Error while processing
transaction after 10 retries, no more retries:"+job);
- }
- } else {
- __log.error("Error while processing
transaction, no retry.", jpe);
- }
- // Let execTransaction know that shit happened.
- throw jpe;
- }
- return null;
- }
- });
- } catch (JobNoLongerInDbException jde) {
- // This may happen if two node try to do the same job...
we try to avoid
- // it the synchronization is a best-effort but not perfect.
- __log.debug("job no longer in db forced rollback.");
- } catch (Exception ex) {
- __log.error("Error while executing transaction", ex);
- } finally {
- _outstandingJobs.remove(job.jobId);
+ class Retry implements Callable<Void> {
+ final Job job;
+ final boolean needRetry;
+
+ Retry(Job job, boolean needRetry) {
+ this.job = job;
+ this.needRetry = needRetry;
+ }
+
+ public Void call() throws Exception {
+ if (needRetry) {
+ int retry = job.detail.get("retry") != null ? (((Integer)
job.detail.get("retry")) + 1) : 0;
+ if (retry <= 10) {
+ long delay = doRetry(job);
+ __log.error("Error while processing transaction, retrying
in " + delay + "s");
+ } else {
+ __log.error("Error while processing transaction after 10
retries, no more retries:" + job);
}
- return null;
}
- });
+
+ // We got rollbacked, as we schedule a retry we want to be sure
the original job is
+ // really deleted
+ if (job.persisted) _db.deleteJob(job.jobId, _nodeId);
+
+ return null;
+ }
+
+ private long doRetry(Job job) throws DatabaseException {
+ int retry = job.detail.get("retry") != null ?
(((Integer)job.detail.get("retry")) + 1) : 0;
+ job.detail.put("retry", retry);
+ long delay = (long)(Math.pow(5, retry));
+ Job jobRetry = new Job(System.currentTimeMillis() + delay*1000,
true, job.detail);
+ _db.insertJob(jobRetry, _nodeId, false);
+ return delay;
+ }
+
}
private void addTodoOnCommit(final Job job) {
@@ -755,15 +745,6 @@
}
- private long doRetry(Job job) throws DatabaseException {
- int retry = job.detail.get("retry") != null ?
(((Integer)job.detail.get("retry")) + 1) : 0;
- job.detail.put("retry", retry);
- long delay = (long)(Math.pow(5, retry));
- Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, true,
job.detail);
- _db.insertJob(jobRetry, _nodeId, false);
- return delay;
- }
-
private abstract class SchedulerTask extends Task implements Runnable {
SchedulerTask(long schedDate) {
super(schedDate);