Author: mszefler
Date: Thu Aug 2 10:59:19 2007
New Revision: 562208
URL: http://svn.apache.org/viewvc?view=rev&rev=562208
Log:
BART tweaks
Modified:
ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Modified:
ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL:
http://svn.apache.org/viewvc/ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?view=diff&rev=562208&r1=562207&r2=562208
==============================================================================
---
ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
(original)
+++
ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
Thu Aug 2 10:59:19 2007
@@ -9,14 +9,10 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
import java.util.concurrent.atomic.AtomicLong;
import javax.transaction.Status;
import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -26,24 +22,21 @@
import org.apache.ode.bpel.iapi.Scheduler;
/**
- * A reliable and relatively simple scheduler that uses a database to persist
information about
- * scheduled tasks.
+ * A reliable and relatively simple scheduler that uses a database to persist
information about scheduled tasks.
*
- * The challange is to achieve high performance in a small memory footprint
without loss of reliability
- * while supporting distributed/clustered configurations.
+ * The challange is to achieve high performance in a small memory footprint
without loss of reliability while supporting
+ * distributed/clustered configurations.
*
- * The design is based around three time horizons: "immediate", "near future",
and "everything else".
- * Immediate jobs (i.e. jobs that are about to be up) are written to the
database and kept in
- * an in-memory priority queue. When they execute, they are removed from the
database. Near future
- * jobs are placed in the database and assigned to the current node, however
they are not stored in
- * memory. Periodically jobs are "upgraded" from near-future to immediate
status, at which point they
- * get loaded into memory. Jobs that are further out in time, are placed in
the database without a
- * node identifer; when they are ready to be "upgraded" to near-future jobs
they are assigned to one
- * of the known live nodes. Recovery is rather straighforward, with stale node
identifiers being
- * reassigned to known good nodes.
+ * The design is based around three time horizons: "immediate", "near future",
and "everything else". Immediate jobs (i.e. jobs that
+ * are about to be up) are written to the database and kept in an in-memory
priority queue. When they execute, they are removed from
+ * the database. Near future jobs are placed in the database and assigned to
the current node, however they are not stored in
+ * memory. Periodically jobs are "upgraded" from near-future to immediate
status, at which point they get loaded into memory. Jobs
+ * that are further out in time, are placed in the database without a node
identifer; when they are ready to be "upgraded" to
+ * near-future jobs they are assigned to one of the known live nodes. Recovery
is rather straighforward, with stale node identifiers
+ * being reassigned to known good nodes.
*
* @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
+ *
*/
public class SimpleScheduler implements Scheduler, TaskRunner {
private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
@@ -65,8 +58,6 @@
TransactionManager _txm;
- ExecutorService _exec;
-
String _nodeId;
/** Maximum number of jobs in the "near future" / todo queue. */
@@ -95,7 +86,6 @@
private Random _random = new Random();
-
public SimpleScheduler(String nodeId, DatabaseDelegate del) {
_nodeId = nodeId;
_db = del;
@@ -126,22 +116,10 @@
_db = dbd;
}
- public void setExecutorService(ExecutorService executorService) {
- _exec = executorService;
- }
-
public void cancelJob(String jobId) throws ContextException {
// TODO: maybe later, not really necessary.
}
- public <T> Future<T> execIsolatedTransaction(final Callable<T>
transaction) throws Exception, ContextException {
- return _exec.submit(new Callable<T>() {
- public T call() throws Exception {
- return execTransaction(transaction);
- }
- });
- }
-
public <T> T execTransaction(Callable<T> transaction) throws Exception,
ContextException {
try {
_txm.begin();
@@ -164,24 +142,6 @@
}
}
- public void registerSynchronizer(final Synchronizer synch) throws
ContextException {
- try {
- _txm.getTransaction().registerSynchronization(new
Synchronization() {
-
- public void beforeCompletion() {
- synch.beforeCompletion();
- }
-
- public void afterCompletion(int status) {
- synch.afterCompletion(status == Status.STATUS_COMMITTED);
- }
-
- });
- } catch (Exception e) {
- throw new ContextException("Unable to register synchronizer.", e);
- }
- }
-
public String schedulePersistedJob(final Map<String, Object> jobDetail,
Date when) throws ContextException {
long ctime = System.currentTimeMillis();
if (when == null)
@@ -247,9 +207,6 @@
if (_running)
return;
- if (_exec == null)
- _exec = Executors.newCachedThreadPool();
-
_todo.clearTasks(UpgradeJobsTask.class);
_todo.clearTasks(LoadImmediateTask.class);
_todo.clearTasks(CheckStaleNodes.class);
@@ -298,64 +255,77 @@
_running = false;
}
+ public void jobCompleted(String jobId) {
+ boolean deleted = false;
+ try {
+ deleted = _db.deleteJob(jobId, _nodeId);
+ } catch (DatabaseException de) {
+ String errmsg = "Database error.";
+ __log.error(errmsg, de);
+ throw new ContextException(errmsg, de);
+ }
+
+ if (!deleted) {
+ try {
+ _txm.getTransaction().setRollbackOnly();
+ } catch (Exception ex) {
+ __log.error("Transaction manager error; setRollbackOnly()
failed.", ex);
+ }
+
+ throw new ContextException("Job no longer in database: jobId=" +
jobId);
+ }
+ }
+
+
/**
* Run a job in the current thread.
- *
+ *
* @param job
* job to run.
*/
protected void runJob(final Job job) {
final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId,
job.detail, 0);
- _exec.submit(new Callable<Void>() {
- public Void call() throws Exception {
- if (job.transacted) {
- try {
- execTransaction(new Callable<Void>() {
- public Void call() throws Exception {
- _jobProcessor.onScheduledJob(jobInfo);
- if (job.persisted)
- if (!_db.deleteJob(job.jobId, _nodeId))
- throw new
JobNoLongerInDbException(job.jobId,_nodeId);
- return null;
- }
- });
- } catch (JobNoLongerInDbException jde) {
- // This may happen if two node try to do the same
job... we try to avoid
- // it the synchronization is a best-effort but not
perfect.
- __log.debug("job no longer in db forced rollback.");
- } catch (Exception ex) {
- __log.error("Error while executing transaction", ex);
- }
- } else {
- _jobProcessor.onScheduledJob(jobInfo);
- }
- return null;
- }
- });
+ try {
+ _jobProcessor.onScheduledJob(jobInfo);
+ } catch (Exception ex) {
+ __log.error("Error in scheduler processor.", ex);
+ }
+
}
private void addTodoOnCommit(final Job job) {
- registerSynchronizer(new Synchronizer() {
- public void afterCompletion(boolean success) {
- if (success) {
- _todo.enqueue(job);
+ Transaction tx;
+ try {
+ tx = _txm.getTransaction();
+ } catch (Exception ex) {
+ String errmsg = "Transaction manager error; unable to obtain
transaction.";
+ __log.error(errmsg, ex);
+ throw new ContextException(errmsg, ex);
+ }
+
+ if (tx == null)
+ throw new ContextException("Missing required transaction in thread
" + Thread.currentThread());
+
+ try {
+ tx.registerSynchronization(new Synchronization() {
+
+ public void afterCompletion(int status) {
+ if (status == Status.STATUS_COMMITTED) {
+ _todo.enqueue(job);
+ }
}
- }
- public void beforeCompletion() {
- }
+ public void beforeCompletion() {
+ }
- });
- }
+ });
- public boolean isTransacted() {
- try {
- Transaction tx = _txm.getTransaction();
- return (tx != null && tx.getStatus() !=
Status.STATUS_NO_TRANSACTION);
- } catch (SystemException e) {
- throw new ContextException("Internal Error: Could not obtain
transaction status.");
+ } catch (Exception e) {
+ String errmsg = "Unable to registrer synchronizer. ";
+ __log.error(errmsg, e);
+ throw new ContextException(errmsg, e);
}
}
@@ -441,6 +411,7 @@
/**
* Re-assign stale node's jobs to self.
+ *
* @param nodeId
*/
void recoverStaleNode(final String nodeId) {
@@ -500,8 +471,9 @@
/**
* Upgrade jobs from far future to immediate future (basically, assign
them to a node).
+ *
* @author mszefler
- *
+ *
*/
private class UpgradeJobsTask extends SchedulerTask {
@@ -554,8 +526,6 @@
}
}
-
}
-
}