Author: boisvert
Date: Tue Sep 8 23:45:34 2009
New Revision: 812729
URL: http://svn.apache.org/viewvc?rev=812729&view=rev
Log:
Add a map of oustanding jobs being dispatched; used to avoid cases where a job
would be dispatched twice if the server is under high load
Modified:
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified:
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL:
http://svn.apache.org/viewvc/ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=812729&r1=812728&r2=812729&view=diff
==============================================================================
---
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
(original)
+++
ode/branches/APACHE_ODE_1.X/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Tue Sep 8 23:45:34 2009
@@ -108,6 +108,11 @@
/** When we last heard from our nodes. */
private ConcurrentHashMap<String, Long> _lastHeartBeat = new
ConcurrentHashMap<String, Long>();
+ /** Set of outstanding jobs, i.e., jobs that have been enqueued but not
dequeued or dispatched yet.
+ Used to avoid cases where a job would be dispatched twice if the
server is under high load and
+ does not fully process a job before it is reloaded from the database.
*/
+ private ConcurrentHashMap<String, Long> _outstandingJobs = new
ConcurrentHashMap<String, Long>();
+
private boolean _running;
/** Time for next upgrade. */
@@ -182,6 +187,7 @@
public void cancelJob(String jobId) throws ContextException {
_todo.dequeue(new Job(0, jobId, false, null));
+ _outstandingJobs.remove(jobId);
try {
_db.deleteJob(jobId, _nodeId);
} catch (DatabaseException e) {
@@ -307,7 +313,6 @@
throw new ContextException("Database error.", dbe);
}
return job.jobId;
-
}
public String scheduleVolatileJob(boolean transacted, Map<String, Object>
jobDetail) throws ContextException {
@@ -338,6 +343,7 @@
_todo.clearTasks(UpgradeJobsTask.class);
_todo.clearTasks(LoadImmediateTask.class);
_todo.clearTasks(CheckStaleNodes.class);
+ _outstandingJobs.clear();
_knownNodes.clear();
@@ -385,6 +391,8 @@
_todo.clearTasks(UpgradeJobsTask.class);
_todo.clearTasks(LoadImmediateTask.class);
_todo.clearTasks(CheckStaleNodes.class);
+ _outstandingJobs.clear();
+
_running = false;
}
@@ -399,45 +407,49 @@
_exec.submit(new Callable<Void>() {
public Void call() throws Exception {
- if (job.transacted) {
- try {
- execTransaction(new Callable<Void>() {
- public Void call() throws Exception {
- if (job.persisted)
- if (!_db.deleteJob(job.jobId, _nodeId))
- throw new
JobNoLongerInDbException(job.jobId,_nodeId);
-
- try {
- _jobProcessor.onScheduledJob(jobInfo);
- } catch (JobProcessorException jpe) {
- if (jpe.retry) {
- int retry = job.detail.get("retry") !=
null ? (((Integer)job.detail.get("retry")) + 1) : 0;
- if (retry <= 10) {
- long delay = doRetry(job);
- __log.error("Error while
processing transaction, retrying in " + delay + "s");
+ try {
+ if (job.transacted) {
+ try {
+ execTransaction(new Callable<Void>() {
+ public Void call() throws Exception {
+ if (job.persisted)
+ if (!_db.deleteJob(job.jobId, _nodeId))
+ throw new
JobNoLongerInDbException(job.jobId,_nodeId);
+
+ try {
+ _jobProcessor.onScheduledJob(jobInfo);
+ } catch (JobProcessorException jpe) {
+ if (jpe.retry) {
+ int retry =
job.detail.get("retry") != null ? (((Integer)job.detail.get("retry")) + 1) : 0;
+ if (retry <= 10) {
+ long delay = doRetry(job);
+ __log.error("Error while
processing transaction, retrying in " + delay + "s");
+ } else {
+ __log.error("Error while
processing transaction after 10 retries, no more retries:"+job);
+ }
} else {
- __log.error("Error while
processing transaction after 10 retries, no more retries:"+job);
+ __log.error("Error while
processing transaction, no retry.", jpe);
}
- } else {
- __log.error("Error while processing
transaction, no retry.", jpe);
+ // Let execTransaction know that shit
happened.
+ throw jpe;
}
- // Let execTransaction know that shit
happened.
- throw jpe;
+ return null;
}
- return null;
- }
- });
- } catch (JobNoLongerInDbException jde) {
- // This may happen if two node try to do the same
job... we try to avoid
- // it the synchronization is a best-effort but not
perfect.
- __log.debug("job no longer in db forced rollback.");
- } catch (Exception ex) {
- __log.error("Error while executing transaction", ex);
+ });
+ } catch (JobNoLongerInDbException jde) {
+ // This may happen if two node try to do the same
job... we try to avoid
+ // it the synchronization is a best-effort but not
perfect.
+ __log.debug("job no longer in db forced
rollback.");
+ } catch (Exception ex) {
+ __log.error("Error while executing transaction",
ex);
+ }
+ } else {
+ _jobProcessor.onScheduledJob(jobInfo);
}
- } else {
- _jobProcessor.onScheduledJob(jobInfo);
+ return null;
+ } finally {
+ _outstandingJobs.remove(job.jobId);
}
- return null;
}
});
}
@@ -511,6 +523,8 @@
__log.debug("job no longer in db forced rollback.");
} catch (Exception ex) {
__log.error("Error while executing transaction", ex);
+ } finally {
+ _outstandingJobs.remove(job.jobId);
}
return null;
}
@@ -521,7 +535,7 @@
registerSynchronizer(new Synchronizer() {
public void afterCompletion(boolean success) {
if (success) {
- _todo.enqueue(job);
+ enqueue(job);
}
}
@@ -545,7 +559,7 @@
if( job.detail.get("runnable") != null ) {
runPolledRunnable(job);
} else {
- runJob((Job) task);
+ runJob(job);
}
} else if (task instanceof SchedulerTask)
((SchedulerTask) task).run();
@@ -580,8 +594,10 @@
if (__log.isDebugEnabled())
__log.debug("todo.enqueue job from db: " + j.jobId + " for
" + j.schedDate);
- if (_todo.size() < _todoLimit)
- _todo.enqueue(j);
+ if (_todo.size() >= _todoLimit)
+ break;
+
+ enqueue(j);
}
return true;
} catch (Exception ex) {
@@ -592,6 +608,20 @@
}
}
+ void enqueue(Job job) {
+ Long outstanding = _outstandingJobs.get(job.jobId);
+ if (outstanding != null &&
System.currentTimeMillis()-outstanding.longValue() > 60*60*1000) {
+ __log.error("Stale outstanding job: "+job.jobId);
+ outstanding = null;
+ }
+ if (outstanding == null) {
+ _outstandingJobs.put(job.jobId, System.currentTimeMillis());
+ _todo.enqueue(job);
+ } else {
+ __log.info("Outstanding job: "+job.jobId);
+ }
+ }
+
boolean doUpgrade() {
__log.debug("UPGRADE started");
final ArrayList<String> knownNodes = new
ArrayList<String>(_knownNodes);