Author: boisvert
Date: Tue Sep  8 23:45:34 2009
New Revision: 812729

URL: http://svn.apache.org/viewvc?rev=812729&view=rev
Log:
Add a map of oustanding jobs being dispatched; used to avoid cases where a job 
would be dispatched twice if the server is under high load

Modified:
    
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: 
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=812729&r1=812728&r2=812729&view=diff
==============================================================================
--- 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 (original)
+++ 
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 Tue Sep  8 23:45:34 2009
@@ -108,6 +108,11 @@
     /** When we last heard from our nodes. */
     private ConcurrentHashMap<String, Long> _lastHeartBeat = new 
ConcurrentHashMap<String, Long>();
 
+    /** Set of outstanding jobs, i.e., jobs that have been enqueued but not 
dequeued or dispatched yet.
+        Used to avoid cases where a job would be dispatched twice if the 
server is under high load and
+        does not fully process a job before it is reloaded from the database. 
*/
+    private ConcurrentHashMap<String, Long> _outstandingJobs = new 
ConcurrentHashMap<String, Long>();
+
     private boolean _running;
 
     /** Time for next upgrade. */
@@ -182,6 +187,7 @@
 
     public void cancelJob(String jobId) throws ContextException {
         _todo.dequeue(new Job(0, jobId, false, null));
+        _outstandingJobs.remove(jobId);
         try {
             _db.deleteJob(jobId, _nodeId);
         } catch (DatabaseException e) {
@@ -307,7 +313,6 @@
             throw new ContextException("Database error.", dbe);
         }
         return job.jobId;
-
     }
 
     public String scheduleVolatileJob(boolean transacted, Map<String, Object> 
jobDetail) throws ContextException {
@@ -338,6 +343,7 @@
         _todo.clearTasks(UpgradeJobsTask.class);
         _todo.clearTasks(LoadImmediateTask.class);
         _todo.clearTasks(CheckStaleNodes.class);
+        _outstandingJobs.clear();
 
         _knownNodes.clear();
 
@@ -385,6 +391,8 @@
         _todo.clearTasks(UpgradeJobsTask.class);
         _todo.clearTasks(LoadImmediateTask.class);
         _todo.clearTasks(CheckStaleNodes.class);
+        _outstandingJobs.clear();
+
         _running = false;
     }
 
@@ -399,45 +407,49 @@
 
         _exec.submit(new Callable<Void>() {
             public Void call() throws Exception {
-                if (job.transacted) {
-                    try {
-                        execTransaction(new Callable<Void>() {
-                            public Void call() throws Exception {
-                                if (job.persisted)
-                                    if (!_db.deleteJob(job.jobId, _nodeId))
-                                        throw new 
JobNoLongerInDbException(job.jobId,_nodeId);
-                                    
-                                try {
-                                    _jobProcessor.onScheduledJob(jobInfo);
-                                } catch (JobProcessorException jpe) {
-                                    if (jpe.retry) {
-                                        int retry = job.detail.get("retry") != 
null ? (((Integer)job.detail.get("retry")) + 1) : 0;
-                                        if (retry <= 10) {
-                                            long delay = doRetry(job);
-                                            __log.error("Error while 
processing transaction, retrying in " + delay + "s");
+                try {
+                    if (job.transacted) {
+                        try {
+                            execTransaction(new Callable<Void>() {
+                                public Void call() throws Exception {
+                                    if (job.persisted)
+                                        if (!_db.deleteJob(job.jobId, _nodeId))
+                                            throw new 
JobNoLongerInDbException(job.jobId,_nodeId);
+                                        
+                                    try {
+                                        _jobProcessor.onScheduledJob(jobInfo);
+                                    } catch (JobProcessorException jpe) {
+                                        if (jpe.retry) {
+                                            int retry = 
job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
+                                            if (retry <= 10) {
+                                                long delay = doRetry(job);
+                                                __log.error("Error while 
processing transaction, retrying in " + delay + "s");
+                                            } else {
+                                                __log.error("Error while 
processing transaction after 10 retries, no more retries:"+job);
+                                            }
                                         } else {
-                                            __log.error("Error while 
processing transaction after 10 retries, no more retries:"+job);
+                                            __log.error("Error while 
processing transaction, no retry.", jpe);
                                         }
-                                    } else {
-                                        __log.error("Error while processing 
transaction, no retry.", jpe);
+                                        // Let execTransaction know that shit 
happened.
+                                        throw jpe;
                                     }
-                                    // Let execTransaction know that shit 
happened.
-                                    throw jpe;
+                                    return null;
                                 }
-                                return null;
-                            }
-                        });
-                    } catch (JobNoLongerInDbException jde) {
-                        // This may happen if two node try to do the same 
job... we try to avoid
-                        // it the synchronization is a best-effort but not 
perfect.
-                        __log.debug("job no longer in db forced rollback.");
-                    } catch (Exception ex) {
-                        __log.error("Error while executing transaction", ex);
+                            });
+                        } catch (JobNoLongerInDbException jde) {
+                            // This may happen if two node try to do the same 
job... we try to avoid
+                            // it the synchronization is a best-effort but not 
perfect.
+                            __log.debug("job no longer in db forced 
rollback.");
+                        } catch (Exception ex) {
+                            __log.error("Error while executing transaction", 
ex);
+                        }
+                    } else {
+                        _jobProcessor.onScheduledJob(jobInfo);
                     }
-                } else {
-                    _jobProcessor.onScheduledJob(jobInfo);
+                    return null;
+                } finally {
+                     _outstandingJobs.remove(job.jobId);
                 }
-                return null;
             }
         });
     }
@@ -511,6 +523,8 @@
                     __log.debug("job no longer in db forced rollback.");
                 } catch (Exception ex) {
                     __log.error("Error while executing transaction", ex);
+                } finally {
+                    _outstandingJobs.remove(job.jobId);
                 }
                 return null;
             }
@@ -521,7 +535,7 @@
         registerSynchronizer(new Synchronizer() {
             public void afterCompletion(boolean success) {
                 if (success) {
-                    _todo.enqueue(job);
+                    enqueue(job);
                 }
             }
 
@@ -545,7 +559,7 @@
             if( job.detail.get("runnable") != null ) {
                 runPolledRunnable(job);
             } else {
-                runJob((Job) task);
+                runJob(job);
             }
         } else if (task instanceof SchedulerTask)
             ((SchedulerTask) task).run();
@@ -580,8 +594,10 @@
                 if (__log.isDebugEnabled())
                     __log.debug("todo.enqueue job from db: " + j.jobId + " for 
" + j.schedDate);
 
-                if (_todo.size() < _todoLimit) 
-                    _todo.enqueue(j);
+                if (_todo.size() >= _todoLimit)
+                    break;
+                
+                enqueue(j);
             }
             return true;
         } catch (Exception ex) {
@@ -592,6 +608,20 @@
         }
     }
 
+    void enqueue(Job job) {
+        Long outstanding = _outstandingJobs.get(job.jobId);
+        if (outstanding != null && 
System.currentTimeMillis()-outstanding.longValue() > 60*60*1000) {
+            __log.error("Stale outstanding job: "+job.jobId);
+            outstanding = null;
+        }
+        if (outstanding == null) {
+            _outstandingJobs.put(job.jobId, System.currentTimeMillis());
+            _todo.enqueue(job);
+        } else {
+            __log.info("Outstanding job: "+job.jobId);
+        }
+    }
+
     boolean doUpgrade() {
         __log.debug("UPGRADE started");
         final ArrayList<String> knownNodes = new 
ArrayList<String>(_knownNodes);


Reply via email to