Author: boisvert
Date: Sat Nov 15 09:02:04 2008
New Revision: 717879
URL: http://svn.apache.org/viewvc?rev=717879&view=rev
Log:
Apparently the Scheduler contract has changed from 1.x branch, reverting
changeset 717872
Modified:
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Modified:
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
(original)
+++
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
Sat Nov 15 09:02:04 2008
@@ -51,10 +51,10 @@
private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid =
?, scheduled = 0 where nodeid = ?";
- private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set
nodeid = ? where nodeid is null "
+ private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set
nodeid = ? where nodeid is null and scheduled = 0 "
+ "and mod(ts,?) = ? and ts < ?";
- private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set
nodeid = ? where nodeid is null "
+ private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set
nodeid = ? where nodeid is null and scheduled = 0 "
+ "and (ts % ?) = ? and ts < ?";
private static final String SAVE_JOB = "insert into ODE_JOB "
@@ -63,7 +63,7 @@
private static final String GET_NODEIDS = "select distinct nodeid from
ODE_JOB";
private static final String SCHEDULE_IMMEDIATE = "select jobid, ts,
transacted, scheduled, details from ODE_JOB "
- + "where nodeid = ? and ts < ? order by ts";
+ + "where nodeid = ? and scheduled = 0 and ts < ? order by ts";
private static final String UPDATE_SCHEDULED = "update ODE_JOB set
scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
@@ -179,6 +179,24 @@
}
rs.close();
ps.close();
+
+ // mark jobs as scheduled, UPDATE_SCHEDULED_SLOTS at a time
+ int j = 0;
+ int updateCount = 0;
+ ps = con.prepareStatement(UPDATE_SCHEDULED);
+ for (int updates = 1; updates <= (ret.size() /
UPDATE_SCHEDULED_SLOTS) + 1; updates++) {
+ for (int i = 1; i <= UPDATE_SCHEDULED_SLOTS; i++) {
+ ps.setString(i, j < ret.size() ? ret.get(j).jobId : "");
+ j++;
+ }
+ ps.execute();
+ updateCount += ps.getUpdateCount();
+ }
+ if (updateCount != ret.size()) {
+ throw new DatabaseException(
+ "Updating scheduled jobs failed to update all jobs;
expected=" + ret.size()
+ + " actual=" + updateCount);
+ }
} catch (SQLException se) {
throw new DatabaseException(se);
} finally {
@@ -286,7 +304,7 @@
d = Dialect.SQLSERVER;
} else if (dbProductName.indexOf("MySQL") >= 0) {
d = Dialect.MYSQL;
- } else if (dbProductName.indexOf("Sybase") >= 0 ||
dbProductName.indexOf("Adaptive") >= 0) {
+ } else if (dbProductName.indexOf("Sybase") >= 0) {
d = Dialect.SYBASE;
}
}
Modified:
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
(original)
+++
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
Sat Nov 15 09:02:04 2008
@@ -25,7 +25,7 @@
/**
* Like a task, but a little bit better.
- *
+ *
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*/
class Job extends Task {
@@ -37,7 +37,7 @@
public Job(long when, boolean transacted, Map<String, Object> jobDetail) {
this(when, new GUID().toString(),transacted,jobDetail);
}
-
+
public Job(long when, String jobId, boolean transacted,Map<String, Object>
jobDetail) {
super(when);
this.jobId = jobId;
@@ -54,9 +54,5 @@
public boolean equals(Object obj) {
return obj instanceof Job && jobId.equals(((Job) obj).jobId);
}
-
- @Override
- public String toString() {
- return "Job "+jobId+" transacted: "+transacted+" persisted:
"+persisted+" details: "+detail;
- }
+
}
Modified:
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
(original)
+++
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
Sat Nov 15 09:02:04 2008
@@ -31,167 +31,168 @@
/**
* Implements the "todo" queue and prioritized scheduling mechanism.
- *
+ *
* @author mszefler
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
+ *
*/
class SchedulerThread implements Runnable {
- private static final Log __log =
LogFactory.getLog(SchedulerThread.class);
+ private static final Log __log = LogFactory.getLog(SchedulerThread.class);
- private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
+ private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
- /** Jobs ready for immediate execution. */
- private PriorityBlockingQueue<Task> _todo;
+ /** Jobs ready for immediate execution. */
+ private PriorityBlockingQueue<Task> _todo;
- /** Lock for managing the queue */
- private ReentrantLock _lock = new ReentrantLock();
-
- private Condition _activity = _lock.newCondition();
-
- private volatile boolean _done;
-
- private TaskRunner _taskrunner;
-
- private Thread _thread;
-
- SchedulerThread(TaskRunner runner) {
- _todo = new
PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
- new JobComparatorByDate());
- _taskrunner = runner;
- }
-
- void start() {
- if (_thread != null)
- return;
-
- _done = false;
- _thread = new Thread(this, "OdeScheduler");
- _thread.start();
- }
-
- /**
- * Shutdown the thread.
- */
- void stop() {
- if (_thread == null)
- return;
-
- _done = true;
- _lock.lock();
- try {
- _activity.signal();
- } finally {
- _lock.unlock();
-
- }
-
- while (_thread != null)
- try {
- _thread.join();
- _thread = null;
- } catch (InterruptedException e) {
- ;
- }
-
- }
-
- /**
- * Add a job to the todo queue.
- *
- * @param job
- */
- void enqueue(Task task) {
- _lock.lock();
- try {
- _todo.add(task);
- _activity.signal();
- } finally {
- _lock.unlock();
- }
- }
-
- /**
- * Remove a job to the todo queue.
- *
- * @param job
- */
- void dequeue(Task task) {
- _lock.lock();
- try {
- _todo.remove(task);
- _activity.signal();
- } finally {
- _lock.unlock();
- }
- }
-
- /**
- * Get the size of the todo queue.
- *
- * @return
- */
- public int size() {
- return _todo.size();
- }
-
- /**
- * Pop items off the todo queue, and send them to the task runner for
processing.
- */
- public void run() {
- while (!_done) {
- _lock.lock();
- try {
- long nextjob;
- while ((nextjob = nextJobTime()) > 0 && !_done)
- _activity.await(nextjob,
TimeUnit.MILLISECONDS);
-
- if (!_done && nextjob == 0) {
- Task task = _todo.take();
- _taskrunner.runTask(task);
-
- }
- } catch (InterruptedException ex) {
- ; // ignore
- } finally {
- _lock.unlock();
- }
- }
- }
-
- /**
- * Calculate the time until the next available job.
- *
- * @return time until next job, 0 if one is one is scheduled to go, and
some
- * really large number if there are no jobs to speak of
- */
- private long nextJobTime() {
- assert _lock.isLocked();
-
- Task job = _todo.peek();
- if (job == null)
- return Long.MAX_VALUE;
-
- return Math.max(0, job.schedDate - System.currentTimeMillis());
- }
-
- /**
- * Remove the tasks of a given type from the list.
- * @param tasktype type of task
- */
- public void clearTasks(final Class<? extends Task> tasktype) {
- _lock.lock();
- try {
- CollectionsX.remove_if(_todo, new
MemberOfFunction<Task>() {
- @Override
- public boolean isMember(Task o) {
- return
tasktype.isAssignableFrom(o.getClass());
- }
-
- });
- } finally {
- _lock.unlock();
- }
- }
+ /** Lock for managing the queue */
+ private ReentrantLock _lock = new ReentrantLock();
+
+ private Condition _activity = _lock.newCondition();
+
+ private volatile boolean _done;
+
+ private TaskRunner _taskrunner;
+
+ private Thread _thread;
+
+ SchedulerThread(TaskRunner runner) {
+ _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
+ new JobComparatorByDate());
+ _taskrunner = runner;
+ }
+
+ void start() {
+ if (_thread != null)
+ return;
+
+ _done = false;
+ _thread = new Thread(this, "OdeScheduler");
+ _thread.start();
+ }
+
+ /**
+ * Shutdown the thread.
+ */
+ void stop() {
+ if (_thread == null)
+ return;
+
+ _done = true;
+ _lock.lock();
+ try {
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+
+ }
+
+ while (_thread != null)
+ try {
+ _thread.join();
+ _thread = null;
+ } catch (InterruptedException e) {
+ ;
+ }
+
+ }
+
+ /**
+ * Add a job to the todo queue.
+ *
+ * @param job
+ */
+ void enqueue(Task task) {
+ _lock.lock();
+ try {
+ _todo.add(task);
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+ /**
+ * Remove a job to the todo queue.
+ *
+ * @param job
+ */
+ void dequeue(Task task) {
+ _lock.lock();
+ try {
+ _todo.remove(task);
+ _activity.signal();
+ } finally {
+ _lock.unlock();
+ }
+ }
+
+
+ /**
+ * Get the size of the todo queue.
+ *
+ * @return
+ */
+ public int size() {
+ return _todo.size();
+ }
+
+ /**
+ * Pop items off the todo queue, and send them to the task runner for
processing.
+ */
+ public void run() {
+ while (!_done) {
+ _lock.lock();
+ try {
+ long nextjob;
+ while ((nextjob = nextJobTime()) > 0 && !_done)
+ _activity.await(nextjob, TimeUnit.MILLISECONDS);
+
+ if (!_done && nextjob == 0) {
+ Task task = _todo.take();
+ _taskrunner.runTask(task);
+
+ }
+ } catch (InterruptedException ex) {
+ ; // ignore
+ } finally {
+ _lock.unlock();
+ }
+ }
+ }
+
+ /**
+ * Calculate the time until the next available job.
+ *
+ * @return time until next job, 0 if one is one is scheduled to go, and
some
+ * really large number if there are no jobs to speak of
+ */
+ private long nextJobTime() {
+ assert _lock.isLocked();
+
+ Task job = _todo.peek();
+ if (job == null)
+ return Long.MAX_VALUE;
+
+ return Math.max(0, job.schedDate - System.currentTimeMillis());
+ }
+
+ /**
+ * Remove the tasks of a given type from the list.
+ * @param tasktype type of task
+ */
+ public void clearTasks(final Class<? extends Task> tasktype) {
+ _lock.lock();
+ try {
+ CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
+ @Override
+ public boolean isMember(Task o) {
+ return tasktype.isAssignableFrom(o.getClass());
+ }
+
+ });
+ } finally {
+ _lock.unlock();
+ }
+ }
}
Modified:
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
(original)
+++
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Sat Nov 15 09:02:04 2008
@@ -23,14 +23,10 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.Status;
import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -40,22 +36,19 @@
import org.apache.ode.bpel.iapi.Scheduler;
/**
- * A reliable and relatively simple scheduler that uses a database to persist
information about
- * scheduled tasks.
- *
- * The challenge is to achieve high performance in a small memory footprint
without loss of reliability
- * while supporting distributed/clustered configurations.
- *
- * The design is based around three time horizons: "immediate", "near future",
and "everything else".
- * Immediate jobs (i.e. jobs that are about to be up) are written to the
database and kept in
- * an in-memory priority queue. When they execute, they are removed from the
database. Near future
- * jobs are placed in the database and assigned to the current node, however
they are not stored in
- * memory. Periodically jobs are "upgraded" from near-future to immediate
status, at which point they
- * get loaded into memory. Jobs that are further out in time, are placed in
the database without a
- * node identifer; when they are ready to be "upgraded" to near-future jobs
they are assigned to one
- * of the known live nodes. Recovery is rather straighforward, with stale node
identifiers being
- * reassigned to known good nodes.
- *
+ * A reliable and relatively simple scheduler that uses a database to persist
information about scheduled tasks.
+ *
+ * The challange is to achieve high performance in a small memory footprint
without loss of reliability while supporting
+ * distributed/clustered configurations.
+ *
+ * The design is based around three time horizons: "immediate", "near future",
and "everything else". Immediate jobs (i.e. jobs that
+ * are about to be up) are written to the database and kept in an in-memory
priority queue. When they execute, they are removed from
+ * the database. Near future jobs are placed in the database and assigned to
the current node, however they are not stored in
+ * memory. Periodically jobs are "upgraded" from near-future to immediate
status, at which point they get loaded into memory. Jobs
+ * that are further out in time, are placed in the database without a node
identifer; when they are ready to be "upgraded" to
+ * near-future jobs they are assigned to one of the known live nodes. Recovery
is rather straighforward, with stale node identifiers
+ * being reassigned to known good nodes.
+ *
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
*
*/
@@ -69,7 +62,7 @@
long _immediateInterval = 30000;
/**
- * Jobs scheduled with a time that is between
(now+immediateInterval,now+nearFutureInterval) will be assigned to the current
+ * Jobs sccheduled with a time that is between
(now+immediateInterval,now+nearFutureInterval) will be assigned to the current
* node, but will not be placed on the todo queue (the promoter will pick
them up).
*/
long _nearFutureInterval = 10 * 60 * 1000;
@@ -77,17 +70,8 @@
/** 10s of no communication and you are deemed dead. */
long _staleInterval = 10000;
- /**
- * Estimated sustained transaction per second capacity of the system.
- * e.g. 100 means the system can process 100 jobs per seconds, on average
- * This number is used to determine how many jobs to load from the
database at once.
- */
- int _tps = 100;
-
TransactionManager _txm;
- ExecutorService _exec;
-
String _nodeId;
/** Maximum number of jobs in the "near future" / todo queue. */
@@ -119,26 +103,10 @@
public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties
conf) {
_nodeId = nodeId;
_db = del;
- _todoLimit = getIntProperty(conf, "ode.scheduler.queueLength",
_todoLimit);
- _immediateInterval = getLongProperty(conf,
"ode.scheduler.immediateInterval", _immediateInterval);
- _nearFutureInterval = getLongProperty(conf,
"ode.scheduler.nearFutureInterval", _nearFutureInterval);
- _staleInterval = getLongProperty(conf, "ode.scheduler.staleInterval",
_staleInterval);
- _tps = getIntProperty(conf, "ode.scheduler.transactionsPerSecond",
_tps);
+ _todoLimit =
Integer.parseInt(conf.getProperty("ode.scheduler.queueLength", "10000"));
_todo = new SchedulerThread(this);
}
- private int getIntProperty(Properties props, String propName, int
defaultValue) {
- String s = props.getProperty(propName);
- if (s != null) return Integer.parseInt(s);
- else return defaultValue;
- }
-
- private long getLongProperty(Properties props, String propName, long
defaultValue) {
- String s = props.getProperty(propName);
- if (s != null) return Long.parseLong(s);
- else return defaultValue;
- }
-
public void setNodeId(String nodeId) {
_nodeId = nodeId;
}
@@ -155,10 +123,6 @@
_nearFutureInterval = nearFutureInterval;
}
- public void setTransactionsPerSecond(int tps) {
- _tps = tps;
- }
-
public void setTransactionManager(TransactionManager txm) {
_txm = txm;
}
@@ -167,10 +131,6 @@
_db = dbd;
}
- public void setExecutorService(ExecutorService executorService) {
- _exec = executorService;
- }
-
public void cancelJob(String jobId) throws ContextException {
_todo.dequeue(new Job(0, jobId, false, null));
try {
@@ -181,20 +141,6 @@
}
}
- public <T> Future<T> execIsolatedTransaction(final Callable<T>
transaction) throws Exception, ContextException {
- return _exec.submit(new Callable<T>() {
- public T call() throws Exception {
- try {
- return execTransaction(transaction);
- } catch (Exception e) {
- __log.error("An exception occured while executing an
isolated transaction, " +
- "the transaction is going to be abandoned.", e);
- return null;
- }
- }
- });
- }
-
public <T> T execTransaction(Callable<T> transaction) throws Exception,
ContextException {
try {
if (__log.isDebugEnabled()) __log.debug("Beginning a new
transaction");
@@ -221,24 +167,6 @@
}
}
- public void registerSynchronizer(final Synchronizer synch) throws
ContextException {
- try {
- _txm.getTransaction().registerSynchronization(new
Synchronization() {
-
- public void beforeCompletion() {
- synch.beforeCompletion();
- }
-
- public void afterCompletion(int status) {
- synch.afterCompletion(status == Status.STATUS_COMMITTED);
- }
-
- });
- } catch (Exception e) {
- throw new ContextException("Unable to register synchronizer.", e);
- }
- }
-
public String schedulePersistedJob(final Map<String, Object> jobDetail,
Date when) throws ContextException {
long ctime = System.currentTimeMillis();
if (when == null)
@@ -254,20 +182,25 @@
try {
if (immediate) {
+ // If we have too many jobs in the queue, we don't allow any
new ones
+ if (_todo.size() > _todoLimit) {
+ __log.error("The execution queue is backed up, the engine
can't keep up with the load. Either " +
+ "increase the queue size or regulate the flow.");
+ return null;
+ }
+
// Immediate scheduling means we put it in the DB for safe
keeping
_db.insertJob(job, _nodeId, true);
-
// And add it to our todo list .
- if (_todo.size() < _todoLimit) {
- addTodoOnCommit(job);
- }
+ addTodoOnCommit(job);
+
__log.debug("scheduled immediate job: " + job.jobId);
} else if (nearfuture) {
// Near future, assign the job to ourselves (why? -- this
makes it very unlikely that we
// would get two nodes trying to process the same instance,
which causes unsightly rollbacks).
_db.insertJob(job, _nodeId, false);
__log.debug("scheduled near-future job: " + job.jobId);
- } else /* far future */ {
+ } else /* far future */{
// Not the near future, we don't assign a node-id, we'll
assign it later.
_db.insertJob(job, null, false);
__log.debug("scheduled far-future job: " + job.jobId);
@@ -302,9 +235,6 @@
if (_running)
return;
- if (_exec == null)
- _exec = Executors.newCachedThreadPool();
-
_todo.clearTasks(UpgradeJobsTask.class);
_todo.clearTasks(LoadImmediateTask.class);
_todo.clearTasks(CheckStaleNodes.class);
@@ -325,28 +255,23 @@
throw new ContextException("Error retrieving node list.", ex);
}
- long now = System.currentTimeMillis();
-
// Pretend we got a heartbeat...
- for (String s : _knownNodes) _lastHeartBeat.put(s, now);
+ for (String s : _knownNodes)
+ _lastHeartBeat.put(s, System.currentTimeMillis());
// schedule immediate job loading for now!
- _todo.enqueue(new LoadImmediateTask(now));
+ _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
// schedule check for stale nodes, make it random so that the nodes
don't overlap.
- _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+ _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long)
(_random.nextDouble() * _staleInterval)));
// do the upgrade sometime (random) in the immediate interval.
- _todo.enqueue(new UpgradeJobsTask(now +
randomMean(_immediateInterval)));
+ _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long)
(_random.nextDouble() * _immediateInterval)));
_todo.start();
_running = true;
}
-
- private long randomMean(long mean) {
- return (long) _random.nextDouble() * mean + (mean/2);
- }
-
+
public synchronized void stop() {
if (!_running)
return;
@@ -358,6 +283,28 @@
_running = false;
}
+ public void jobCompleted(String jobId) {
+ boolean deleted = false;
+ try {
+ deleted = _db.deleteJob(jobId, _nodeId);
+ } catch (DatabaseException de) {
+ String errmsg = "Database error.";
+ __log.error(errmsg, de);
+ throw new ContextException(errmsg, de);
+ }
+
+ if (!deleted) {
+ try {
+ _txm.getTransaction().setRollbackOnly();
+ } catch (Exception ex) {
+ __log.error("Transaction manager error; setRollbackOnly()
failed.", ex);
+ }
+
+ throw new ContextException("Job no longer in database: jobId=" +
jobId);
+ }
+ }
+
+
/**
* Run a job in the current thread.
*
@@ -368,70 +315,53 @@
final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId,
job.detail,
(Integer)(job.detail.get("retry") != null ?
job.detail.get("retry") : 0));
- _exec.submit(new Callable<Void>() {
- public Void call() throws Exception {
- if (job.transacted) {
- try {
- execTransaction(new Callable<Void>() {
- public Void call() throws Exception {
- if (job.persisted)
- if (!_db.deleteJob(job.jobId, _nodeId))
- throw new
JobNoLongerInDbException(job.jobId,_nodeId);
-
- try {
- _jobProcessor.onScheduledJob(jobInfo);
- } catch (JobProcessorException jpe) {
- if (jpe.retry) {
- int retry = job.detail.get("retry") !=
null ? (((Integer)job.detail.get("retry")) + 1) : 0;
- if (retry <= 10) {
- long delay = doRetry(job);
- __log.error("Error while
processing transaction, retrying in " + delay + "s");
- } else {
- __log.error("Error while
processing transaction after 10 retries, no more retries:"+job);
- }
- } else {
- __log.error("Error while processing
transaction, no retry.", jpe);
- }
- }
- return null;
- }
- });
- } catch (JobNoLongerInDbException jde) {
- // This may happen if two node try to do the same
job... we try to avoid
- // it the synchronization is a best-effort but not
perfect.
- __log.debug("job no longer in db forced rollback.");
- } catch (Exception ex) {
- __log.error("Error while executing transaction", ex);
- }
- } else {
- _jobProcessor.onScheduledJob(jobInfo);
- }
- return null;
+ try {
+ try {
+ _jobProcessor.onScheduledJob(jobInfo);
+ } catch (JobProcessorException jpe) {
+ if (jpe.retry)
+ __log.error("Error while processing transaction, retrying
in " + doRetry(job) + "s");
+ else
+ __log.error("Error while processing transaction, no
retry.", jpe);
}
- });
+ } catch (Exception ex) {
+ __log.error("Error in scheduler processor.", ex);
+ }
+
}
private void addTodoOnCommit(final Job job) {
- registerSynchronizer(new Synchronizer() {
- public void afterCompletion(boolean success) {
- if (success) {
- _todo.enqueue(job);
+ Transaction tx;
+ try {
+ tx = _txm.getTransaction();
+ } catch (Exception ex) {
+ String errmsg = "Transaction manager error; unable to obtain
transaction.";
+ __log.error(errmsg, ex);
+ throw new ContextException(errmsg, ex);
+ }
+
+ if (tx == null)
+ throw new ContextException("Missing required transaction in thread
" + Thread.currentThread());
+
+ try {
+ tx.registerSynchronization(new Synchronization() {
+
+ public void afterCompletion(int status) {
+ if (status == Status.STATUS_COMMITTED) {
+ _todo.enqueue(job);
+ }
}
- }
- public void beforeCompletion() {
- }
+ public void beforeCompletion() {
+ }
- });
- }
+ });
- public boolean isTransacted() {
- try {
- Transaction tx = _txm.getTransaction();
- return (tx != null && tx.getStatus() !=
Status.STATUS_NO_TRANSACTION);
- } catch (SystemException e) {
- throw new ContextException("Internal Error: Could not obtain
transaction status.");
+ } catch (Exception e) {
+ String errmsg = "Unable to registrer synchronizer. ";
+ __log.error(errmsg, e);
+ throw new ContextException(errmsg, e);
}
}
@@ -455,25 +385,21 @@
boolean doLoadImmediate() {
__log.debug("LOAD IMMEDIATE started");
-
- // don't load anything if we're already half-full; we've got plenty
to do already
- if (_todo.size() > _todoLimit/2) return true;
-
List<Job> jobs;
try {
- final int batch = (int) (_immediateInterval * _tps / 1000);
- jobs = execTransaction(new Callable<List<Job>>() {
- public List<Job> call() throws Exception {
- return _db.dequeueImmediate(_nodeId,
System.currentTimeMillis() + _immediateInterval, batch);
- }
- });
- for (Job j : jobs) {
- if (__log.isDebugEnabled())
- __log.debug("todo.enqueue job from db: " + j.jobId + " for
" + j.schedDate);
+ do {
+ jobs = execTransaction(new Callable<List<Job>>() {
+ public List<Job> call() throws Exception {
+ return _db.dequeueImmediate(_nodeId,
System.currentTimeMillis() + _immediateInterval, 10);
+ }
+ });
+ for (Job j : jobs) {
+ if (__log.isDebugEnabled())
+ __log.debug("todo.enqueue job from db: " + j.jobId + "
for " + j.schedDate);
- if (_todo.size() < _todoLimit)
_todo.enqueue(j);
- }
+ }
+ } while (jobs.size() == 10);
return true;
} catch (Exception ex) {
__log.error("Error loading immediate jobs from database.", ex);
@@ -521,6 +447,7 @@
/**
* Re-assign stale node's jobs to self.
+ *
* @param nodeId
*/
void recoverStaleNode(final String nodeId) {
@@ -549,7 +476,6 @@
} finally {
__log.debug("node recovery complete");
}
-
}
private long doRetry(Job job) throws DatabaseException {
@@ -581,9 +507,9 @@
success = doLoadImmediate();
} finally {
if (success)
- _todo.enqueue(new
LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval *
.90)));
+ _todo.enqueue(new
LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval *
.75)));
else
- _todo.enqueue(new
LoadImmediateTask(System.currentTimeMillis() + 1000));
+ _todo.enqueue(new
LoadImmediateTask(System.currentTimeMillis() + 100));
}
}
@@ -591,6 +517,7 @@
/**
* Upgrade jobs from far future to immediate future (basically, assign
them to a node).
+ *
* @author mszefler
*
*/
@@ -617,7 +544,7 @@
try {
success = doUpgrade();
} finally {
- long future = System.currentTimeMillis() + (success ? (long)
(_nearFutureInterval * .50) : 1000);
+ long future = System.currentTimeMillis() + (success ? (long)
(_nearFutureInterval * .50) : 100);
_nextUpgrade.set(future);
_todo.enqueue(new UpgradeJobsTask(future));
__log.debug("UPGRADE completed, success = " + success + ";
next time in " + (future - ctime) + "ms");
@@ -640,16 +567,11 @@
__log.debug("CHECK STALE NODES started");
for (String nodeId : _knownNodes) {
Long lastSeen = _lastHeartBeat.get(nodeId);
- if ((lastSeen == null || (System.currentTimeMillis() -
lastSeen) > _staleInterval)
- && !_nodeId.equals(nodeId))
- {
+ if (lastSeen == null || (System.currentTimeMillis() -
lastSeen) > _staleInterval)
recoverStaleNode(nodeId);
- }
}
}
-
}
-
}
Modified:
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
(original)
+++
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
Sat Nov 15 09:02:04 2008
@@ -43,7 +43,7 @@
_ds = new DelegateSupport();
_del = _ds.delegate();
}
-
+
public void testGetNodeIds() throws Exception {
// should have no node ids in the db, empty list (not null)
@@ -91,9 +91,8 @@
assertEquals("j1",jobs.get(0).jobId);
jobs = _del.dequeueImmediate("n1", 250L, 1000);
assertNotNull(jobs);
- assertEquals(2, jobs.size());
- assertEquals("j1",jobs.get(0).jobId);
- assertEquals("j2",jobs.get(1).jobId);
+ assertEquals(1, jobs.size());
+ assertEquals("j2",jobs.get(0).jobId);
}
public void testScheduleImmediateMaxRows() throws Exception {
@@ -104,6 +103,10 @@
assertNotNull(jobs);
assertEquals(1, jobs.size());
assertEquals("j1",jobs.get(0).jobId);
+ jobs = _del.dequeueImmediate("n1", 250L, 1000);
+ assertNotNull(jobs);
+ assertEquals(1, jobs.size());
+ assertEquals("j2",jobs.get(0).jobId);
}
public void testScheduleImmediateNodeFilter() throws Exception {
Modified:
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
(original)
+++
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
Sat Nov 15 09:02:04 2008
@@ -51,7 +51,7 @@
public void onScheduledJob(Scheduler.JobInfo jobInfo) throws
Scheduler.JobProcessorException {
_tried++;
- throw new Scheduler.JobProcessorException(jobInfo.retryCount < 3);
+ throw new Scheduler.JobProcessorException(jobInfo.retryCount < 2);
}
Map<String, Object> newDetail(String x) {
Modified:
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
(original)
+++
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
Sat Nov 15 09:02:04 2008
@@ -52,7 +52,7 @@
_st.start();
long schedtime = System.currentTimeMillis() + 300;
_st.enqueue(new Task(schedtime));
- Thread.sleep(1000);
+ Thread.sleep(600);
assertEquals(1,_tasks.size());
assertTrue(_tasks.get(0).time < schedtime + SCHED_TOLERANCE / 2);
assertTrue(_tasks.get(0).time > schedtime - SCHED_TOLERANCE / 2);
Modified:
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL:
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
---
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
(original)
+++
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
Sat Nov 15 09:02:04 2008
@@ -21,34 +21,29 @@
import java.util.*;
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
import javax.transaction.TransactionManager;
import junit.framework.TestCase;
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
public class SimpleSchedulerTest extends TestCase implements JobProcessor {
DelegateSupport _ds;
SimpleScheduler _scheduler;
ArrayList<JobInfo> _jobs;
- ArrayList<JobInfo> _commit;
TransactionManager _txm;
+
public void setUp() throws Exception {
_txm = new GeronimoTransactionManager();
_ds = new DelegateSupport();
_scheduler = newScheduler("n1");
_jobs = new ArrayList<JobInfo>(100);
- _commit = new ArrayList<JobInfo>(100);
}
public void tearDown() throws Exception {
@@ -57,25 +52,21 @@
public void testConcurrentExec() throws Exception {
_scheduler.start();
- for (int i=0; i<10; i++) {
- _txm.begin();
- String jobId;
- try {
- int jobs = _jobs.size();
- jobId = _scheduler.schedulePersistedJob(newDetail("123"), new
Date(System.currentTimeMillis() + 200));
- Thread.sleep(100);
- // Make sure we don't schedule until commit.
- assertEquals(jobs, _jobs.size());
- } finally {
- _txm.commit();
- }
- // Delete from DB
- assertEquals(true,_ds.delegate().deleteJob(jobId, "n1"));
- // Wait for the job to be execed.
- Thread.sleep(250);
- // We should always have same number of jobs/commits
- assertEquals(_jobs.size(), _commit.size());
+ _txm.begin();
+ String jobId;
+ try {
+ jobId = _scheduler.schedulePersistedJob(newDetail("123"), new
Date(System.currentTimeMillis() + 100));
+ Thread.sleep(200);
+ // Make sure we don't schedule until commit.
+ assertEquals(0, _jobs.size());
+ } finally {
+ _txm.commit();
}
+ // Wait for the job to be execed.
+ Thread.sleep(100);
+ // Should execute job,
+ assertEquals(1, _jobs.size());
+
}
public void testImmediateScheduling() throws Exception {
@@ -116,124 +107,102 @@
public void testNearFutureScheduling() throws Exception {
// speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(10000);
- _scheduler.setImmediateInterval(5000);
+ _scheduler.setNearFutureInterval(1000);
+ _scheduler.setImmediateInterval(500);
_scheduler.start();
_txm.begin();
try {
- _scheduler.schedulePersistedJob(newDetail("123"), new
Date(System.currentTimeMillis() + 7500));
+ _scheduler.schedulePersistedJob(newDetail("123"), new
Date(System.currentTimeMillis() + 750));
} finally {
_txm.commit();
}
- Thread.sleep(8500);
+ Thread.sleep(850);
assertEquals(1, _jobs.size());
}
public void testFarFutureScheduling() throws Exception {
// speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(7000);
- _scheduler.setImmediateInterval(3000);
+ _scheduler.setNearFutureInterval(700);
+ _scheduler.setImmediateInterval(300);
_scheduler.start();
_txm.begin();
try {
- _scheduler.schedulePersistedJob(newDetail("123"), new
Date(System.currentTimeMillis() + 7500));
+ _scheduler.schedulePersistedJob(newDetail("123"), new
Date(System.currentTimeMillis() + 750));
} finally {
_txm.commit();
}
- Thread.sleep(8500);
+ Thread.sleep(850);
assertEquals(1, _jobs.size());
}
public void testRecovery() throws Exception {
// speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(2000);
- _scheduler.setImmediateInterval(1000);
- _scheduler.setStaleInterval(500);
+ _scheduler.setNearFutureInterval(200);
+ _scheduler.setImmediateInterval(100);
+ _scheduler.setStaleInterval(50);
_txm.begin();
try {
_scheduler.schedulePersistedJob(newDetail("immediate"), new
Date(System.currentTimeMillis()));
- _scheduler.schedulePersistedJob(newDetail("near"), new
Date(System.currentTimeMillis() + 1100));
- _scheduler.schedulePersistedJob(newDetail("far"), new
Date(System.currentTimeMillis() + 2500));
+ _scheduler.schedulePersistedJob(newDetail("near"), new
Date(System.currentTimeMillis() + 110));
+ _scheduler.schedulePersistedJob(newDetail("far"), new
Date(System.currentTimeMillis() + 250));
} finally {
_txm.commit();
}
_scheduler = newScheduler("n3");
- _scheduler.setNearFutureInterval(2000);
- _scheduler.setImmediateInterval(1000);
- _scheduler.setStaleInterval(1000);
+ _scheduler.setNearFutureInterval(200);
+ _scheduler.setImmediateInterval(100);
+ _scheduler.setStaleInterval(50);
_scheduler.start();
- Thread.sleep(4000);
+ Thread.sleep(400);
assertEquals(3, _jobs.size());
}
public void testRecoverySuppressed() throws Exception {
// speed things up a bit to hit the right code paths
- _scheduler.setNearFutureInterval(2000);
- _scheduler.setImmediateInterval(1000);
- _scheduler.setStaleInterval(500);
+ _scheduler.setNearFutureInterval(200);
+ _scheduler.setImmediateInterval(100);
+ _scheduler.setStaleInterval(50);
+ // schedule some jobs ...
_txm.begin();
try {
_scheduler.schedulePersistedJob(newDetail("immediate"), new
Date(System.currentTimeMillis()));
- _scheduler.schedulePersistedJob(newDetail("near"), new
Date(System.currentTimeMillis() + 1100));
- _scheduler.schedulePersistedJob(newDetail("far"), new
Date(System.currentTimeMillis() + 15000));
+ _scheduler.schedulePersistedJob(newDetail("near"), new
Date(System.currentTimeMillis() + 150));
+ _scheduler.schedulePersistedJob(newDetail("far"), new
Date(System.currentTimeMillis() + 250));
} finally {
_txm.commit();
- }
- _scheduler.stop();
+ }
- _scheduler = newScheduler("n3");
- _scheduler.setNearFutureInterval(2000);
- _scheduler.setImmediateInterval(1000);
- _scheduler.setStaleInterval(1000);
- _scheduler.start();
+ // but don't start the scheduler....
+
+ // create a second node for the scheduler.
+ SimpleScheduler scheduler = newScheduler("n3");
+ scheduler.setNearFutureInterval(200);
+ scheduler.setImmediateInterval(100);
+ scheduler.setStaleInterval(50);
+ scheduler.start();
for (int i = 0; i < 40; ++i) {
- _scheduler.updateHeartBeat("n1");
- Thread.sleep(100);
+ scheduler.updateHeartBeat("n1");
+ Thread.sleep(10);
}
- _scheduler.stop();
- Thread.sleep(1000);
+ scheduler.stop();
- assertEquals(0, _jobs.size());
+ assertTrue(_jobs.size() <= 1);
+ if (_jobs.size() == 1)
+ assertEquals("far", _jobs.get(0).jobDetail.get("foo"));
}
public void onScheduledJob(final JobInfo jobInfo) throws
JobProcessorException {
synchronized (_jobs) {
_jobs.add(jobInfo);
}
-
- try {
- _txm.getTransaction().registerSynchronization(new
Synchronization() {
-
- public void afterCompletion(int arg0) {
- if (arg0 == Status.STATUS_COMMITTED)
- _commit.add(jobInfo);
- }
-
- public void beforeCompletion() {
- // TODO Auto-generated method stub
-
- }
-
- });
- } catch (IllegalStateException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (RollbackException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- } catch (SystemException e) {
- // TODO Auto-generated catch block
- e.printStackTrace();
- }
-
}
Map<String, Object> newDetail(String x) {