Author: midon
Date: Sat Oct 10 06:28:56 2009
New Revision: 823804

URL: http://svn.apache.org/viewvc?rev=823804&view=rev
Log:
Clean up the code to avoid confusing duplication and apply the same
retry policy to all job types

Modified:
    
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=823804&r1=823803&r2=823804&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 Sat Oct 10 06:28:56 2009
@@ -419,80 +419,83 @@
         _running = false;
     }
 
-    /**
-     * Run a job in the current thread.
-     *
-     * @param job job to run.
-     */
-    protected void runJob(final Job job) {
-        final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, 
job.detail,
-                (Integer)(job.detail.get("retry") != null ? 
job.detail.get("retry") : 0));
+    class RunJob implements Callable<Void> {
+        final Job job;
+        final JobProcessor processor;
 
-        _exec.submit(new Callable<Void>() {
-            public Void call() throws Exception {
-                try {
-                    if (job.transacted) {
-                        final Object[] needRetry = new Object[] { false };
-                        try {
-                            execTransaction(new Callable<Void>() {
-                                public Void call() throws Exception {
-                                    if (job.persisted)
-                                        if (!_db.deleteJob(job.jobId, _nodeId))
-                                            throw new 
JobNoLongerInDbException(job.jobId,_nodeId);
-                                    try {
-                                        _jobProcessor.onScheduledJob(jobInfo);
-                                    } catch (JobProcessorException jpe) {
-                                        if (jpe.retry) {
-                                            needRetry[0] = true;
-                                        } else {
-                                            __log.error("Error while 
processing transaction, no retry.", jpe);
+        RunJob(Job job, JobProcessor processor) {
+            this.job = job;
+            this.processor = processor;
+        }
+
+        public Void call() throws Exception {
+            try {
+                final Scheduler.JobInfo jobInfo = new 
Scheduler.JobInfo(job.jobId, job.detail,
+                        (Integer) (job.detail.get("retry") != null ? 
job.detail.get("retry") : 0));
+                if (job.transacted) {
+                    final boolean[] needRetry = new boolean[]{false};
+                    try {
+                        execTransaction(new Callable<Void>() {
+                            public Void call() throws Exception {
+                                if (job.persisted)
+                                    if (!_db.deleteJob(job.jobId, _nodeId))
+                                        throw new 
JobNoLongerInDbException(job.jobId, _nodeId);
+                                try {
+                                    processor.onScheduledJob(jobInfo);
+                                    // If the job is a "runnable" job, 
schedule the next job occurence
+                                    if (job.detail.get("runnable") != null && 
!"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status")))) {
+                                        // the runnable is still in progress, 
schedule checker to 10 mins later
+                                        if (_pollIntervalForPolledRunnable < 
0) {
+                                            if (__log.isWarnEnabled())
+                                                __log.warn("The poll interval 
for polled runnables is negative; setting it to 1000ms");
+                                            _pollIntervalForPolledRunnable = 
1000;
                                         }
-                                        // Let execTransaction know that shit 
happened.
-                                        throw jpe;
+                                        job.schedDate = 
System.currentTimeMillis() + _pollIntervalForPolledRunnable;
+                                        _db.insertJob(job, _nodeId, false);
                                     }
-                                    return null;
-                                }
-                            });
-                        } catch (JobNoLongerInDbException jde) {
-                            // This may happen if two node try to do the same 
job... we try to avoid
-                            // it the synchronization is a best-effort but not 
perfect.
-                            __log.debug("job no longer in db forced 
rollback.");
-                        } catch (final Exception ex) {
-                            // We only get here if the above execTransaction 
fails, so that transaction got
-                            // rollbacked already
-                            execTransaction(new Callable<Void>() {
-                                public Void call() throws Exception {
-                                    if ((Boolean)needRetry[0]) {
-                                        int retry = job.detail.get("retry") != 
null ? (((Integer)job.detail.get("retry")) + 1) : 0;
-                                        if (retry <= 10) {
-                                            long delay = doRetry(job);
-                                            __log.error("Error while 
processing transaction, retrying in " + delay + "s");
-                                        } else {
-                                            __log.error("Error while 
processing transaction after 10 retries, no more retries:"+job);
-                                        }
+                                } catch (JobProcessorException jpe) {
+                                    if (jpe.retry) {
+                                        needRetry[0] = true;
+                                    } else {
+                                        __log.error("Error while processing 
transaction, no retry.", jpe);
                                     }
-
-                                    // We got rollbacked, as we schedule a 
retry we want to be sure the original kob is
-                                    // really deleted
-                                    if (job.persisted) 
_db.deleteJob(job.jobId, _nodeId);
-
-                                    __log.error("Error while executing 
transaction", ex);
-                                    return null;
+                                    // Let execTransaction know that shit 
happened.
+                                    throw jpe;
                                 }
-                            });
-                        }
-                    } else {
-                        _jobProcessor.onScheduledJob(jobInfo);
+                                return null;
+                            }
+                        });
+                    } catch (JobNoLongerInDbException jde) {
+                        // This may happen if two node try to do the same 
job... we try to avoid
+                        // it the synchronization is a best-effort but not 
perfect.
+                        __log.debug("job no longer in db forced rollback.");
+                    } catch (final Exception ex) {
+                        __log.error("Error while executing transaction", ex);
+
+                        // We only get here if the above execTransaction 
fails, so that transaction got
+                        // rollbacked already
+                        execTransaction(new Retry(job, needRetry[0]));
                     }
-                    return null;
-                } finally {
-                    _outstandingJobs.remove(job.jobId);
+                } else {
+                    processor.onScheduledJob(jobInfo);
                 }
+                return null;
+            } finally {
+                _outstandingJobs.remove(job.jobId);
             }
-        });
+        }
     }
-
+    
     /**
+     * Run a job in the current thread.
+     *
+     * @param job job to run.
+     */
+    protected void runJob(final Job job) {
+        _exec.submit(new RunJob(job, _jobProcessor));
+    }
+
+     /**
      * Run a job from a polled runnable thread. The runnable is not persistent,
      * however, the poller is persistent and wakes up every given interval to
      * check the status of the runnable.
@@ -507,7 +510,7 @@
      * <li>5. System powered off and restarts; the poller job does not know 
what the status
      * of the runnable. This is handled just like the case #1.</li>
      * </ul>
-     *
+     * <p/>
      * There is at least one re-scheduling of the poller job. Since, the 
runnable's state is
      * not persisted, and the same runnable may be tried again after system 
failure,
      * the runnable that's used with this polling should be repeatable.
@@ -515,58 +518,45 @@
      * @param job job to run.
      */
     protected void runPolledRunnable(final Job job) {
-        final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, 
job.detail,
-                (Integer)(job.detail.get("retry") != null ? 
job.detail.get("retry") : 0));
+         _exec.submit(new RunJob(job, _polledRunnableProcessor));
+    }
 
-        _exec.submit(new Callable<Void>() {
-            public Void call() throws Exception {
-                try {
-                    execTransaction(new Callable<Void>() {
-                        public Void call() throws Exception {
-                            if (!_db.deleteJob(job.jobId, _nodeId))
-                                throw new 
JobNoLongerInDbException(job.jobId,_nodeId);
-
-                            try {
-                                
_polledRunnableProcessor.onScheduledJob(jobInfo);
-                                if( 
!"COMPLETED".equals(String.valueOf(jobInfo.jobDetail.get("runnable_status"))) ) 
{
-                                    // the runnable is still in progress, 
schedule checker to 10 mins later
-                                    if( _pollIntervalForPolledRunnable < 0 ) {
-                                        if(__log.isWarnEnabled()) 
__log.warn("The poll interval for polled runnables is negative; setting it to 
1000ms");
-                                        _pollIntervalForPolledRunnable = 1000;
-                                    }
-                                    job.schedDate = System.currentTimeMillis() 
+ _pollIntervalForPolledRunnable;
-                                    _db.insertJob(job, _nodeId, false);
-                                }
-                            } catch (JobProcessorException jpe) {
-                                if (jpe.retry) {
-                                    int retry = job.detail.get("retry") != 
null ? (((Integer)job.detail.get("retry")) + 1) : 0;
-                                    if (retry <= 10) {
-                                        long delay = doRetry(job);
-                                        __log.error("Error while processing 
transaction, retrying in " + delay + "s");
-                                    } else {
-                                        __log.error("Error while processing 
transaction after 10 retries, no more retries:"+job);
-                                    }
-                                } else {
-                                    __log.error("Error while processing 
transaction, no retry.", jpe);
-                                }
-                                // Let execTransaction know that shit happened.
-                                throw jpe;
-                            }
-                            return null;
-                        }
-                    });
-                } catch (JobNoLongerInDbException jde) {
-                    // This may happen if two node try to do the same job... 
we try to avoid
-                    // it the synchronization is a best-effort but not perfect.
-                    __log.debug("job no longer in db forced rollback.");
-                } catch (Exception ex) {
-                    __log.error("Error while executing transaction", ex);
-                } finally {
-                    _outstandingJobs.remove(job.jobId);
+    class Retry implements Callable<Void> {
+        final Job job;
+        final boolean needRetry;
+
+        Retry(Job job, boolean needRetry) {
+            this.job = job;
+            this.needRetry = needRetry;
+        }
+
+        public Void call() throws Exception {
+            if (needRetry) {
+                int retry = job.detail.get("retry") != null ? (((Integer) 
job.detail.get("retry")) + 1) : 0;
+                if (retry <= 10) {
+                    long delay = doRetry(job);
+                    __log.error("Error while processing transaction, retrying 
in " + delay + "s");
+                } else {
+                    __log.error("Error while processing transaction after 10 
retries, no more retries:" + job);
                 }
-                return null;
             }
-        });
+
+            // We got rollbacked, as we schedule a retry we want to be sure 
the original job is
+            // really deleted
+            if (job.persisted) _db.deleteJob(job.jobId, _nodeId);
+
+            return null;
+        }
+
+        private long doRetry(Job job) throws DatabaseException {
+          int retry = job.detail.get("retry") != null ? 
(((Integer)job.detail.get("retry")) + 1) : 0;
+          job.detail.put("retry", retry);
+          long delay = (long)(Math.pow(5, retry));
+          Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, 
true, job.detail);
+          _db.insertJob(jobRetry, _nodeId, false);
+          return delay;
+        }
+
     }
 
     private void addTodoOnCommit(final Job job) {
@@ -755,15 +745,6 @@
 
     }
 
-    private long doRetry(Job job) throws DatabaseException {
-        int retry = job.detail.get("retry") != null ? 
(((Integer)job.detail.get("retry")) + 1) : 0;
-        job.detail.put("retry", retry);
-        long delay = (long)(Math.pow(5, retry));
-        Job jobRetry = new Job(System.currentTimeMillis() + delay*1000, true, 
job.detail);
-        _db.insertJob(jobRetry, _nodeId, false);
-        return delay;
-    }
-
     private abstract class SchedulerTask extends Task implements Runnable {
         SchedulerTask(long schedDate) {
             super(schedDate);


Reply via email to