Author: mszefler
Date: Thu Aug  2 10:59:19 2007
New Revision: 562208

URL: http://svn.apache.org/viewvc?view=rev&rev=562208
Log:
BART tweaks

Modified:
    
ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java

Modified: 
ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: 
http://svn.apache.org/viewvc/ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?view=diff&rev=562208&r1=562207&r2=562208
==============================================================================
--- 
ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 (original)
+++ 
ode/branches/bart/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 Thu Aug  2 10:59:19 2007
@@ -9,14 +9,10 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.Status;
 import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 
@@ -26,24 +22,21 @@
 import org.apache.ode.bpel.iapi.Scheduler;
 
 /**
- * A reliable and relatively simple scheduler that uses a database to persist 
information about 
- * scheduled tasks.
+ * A reliable and relatively simple scheduler that uses a database to persist 
information about scheduled tasks.
  * 
- * The challange is to achieve high performance in a small memory footprint 
without loss of reliability
- * while supporting distributed/clustered configurations.
+ * The challange is to achieve high performance in a small memory footprint 
without loss of reliability while supporting
+ * distributed/clustered configurations.
  * 
- * The design is based around three time horizons: "immediate", "near future", 
and "everything else". 
- * Immediate jobs (i.e. jobs that are about to be up) are written to the 
database and kept in
- * an in-memory priority queue. When they execute, they are removed from the 
database. Near future
- * jobs are placed in the database and assigned to the current node, however 
they are not stored in
- * memory. Periodically jobs are "upgraded" from near-future to immediate 
status, at which point they
- * get loaded into memory. Jobs that are further out in time, are placed in 
the database without a 
- * node identifer; when they are ready to be "upgraded" to near-future jobs 
they are assigned to one
- * of the known live nodes. Recovery is rather straighforward, with stale node 
identifiers being 
- * reassigned to known good nodes.       
+ * The design is based around three time horizons: "immediate", "near future", 
and "everything else". Immediate jobs (i.e. jobs that
+ * are about to be up) are written to the database and kept in an in-memory 
priority queue. When they execute, they are removed from
+ * the database. Near future jobs are placed in the database and assigned to 
the current node, however they are not stored in
+ * memory. Periodically jobs are "upgraded" from near-future to immediate 
status, at which point they get loaded into memory. Jobs
+ * that are further out in time, are placed in the database without a node 
identifer; when they are ready to be "upgraded" to
+ * near-future jobs they are assigned to one of the known live nodes. Recovery 
is rather straighforward, with stale node identifiers
+ * being reassigned to known good nodes.
  * 
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- *
+ * 
  */
 public class SimpleScheduler implements Scheduler, TaskRunner {
     private static final Log __log = LogFactory.getLog(SimpleScheduler.class);
@@ -65,8 +58,6 @@
 
     TransactionManager _txm;
 
-    ExecutorService _exec;
-
     String _nodeId;
 
     /** Maximum number of jobs in the "near future" / todo queue. */
@@ -95,7 +86,6 @@
 
     private Random _random = new Random();
 
-
     public SimpleScheduler(String nodeId, DatabaseDelegate del) {
         _nodeId = nodeId;
         _db = del;
@@ -126,22 +116,10 @@
         _db = dbd;
     }
 
-    public void setExecutorService(ExecutorService executorService) {
-        _exec = executorService;
-    }
-
     public void cancelJob(String jobId) throws ContextException {
         // TODO: maybe later, not really necessary.
     }
 
-    public <T> Future<T> execIsolatedTransaction(final Callable<T> 
transaction) throws Exception, ContextException {
-        return _exec.submit(new Callable<T>() {
-            public T call() throws Exception {
-                return execTransaction(transaction);
-            }
-        });
-    }
-
     public <T> T execTransaction(Callable<T> transaction) throws Exception, 
ContextException {
         try {
             _txm.begin();
@@ -164,24 +142,6 @@
         }
     }
 
-    public void registerSynchronizer(final Synchronizer synch) throws 
ContextException {
-        try {
-            _txm.getTransaction().registerSynchronization(new 
Synchronization() {
-
-                public void beforeCompletion() {
-                    synch.beforeCompletion();
-                }
-
-                public void afterCompletion(int status) {
-                    synch.afterCompletion(status == Status.STATUS_COMMITTED);
-                }
-
-            });
-        } catch (Exception e) {
-            throw new ContextException("Unable to register synchronizer.", e);
-        }
-    }
-
     public String schedulePersistedJob(final Map<String, Object> jobDetail, 
Date when) throws ContextException {
         long ctime = System.currentTimeMillis();
         if (when == null)
@@ -247,9 +207,6 @@
         if (_running)
             return;
 
-        if (_exec == null)
-            _exec = Executors.newCachedThreadPool();
-
         _todo.clearTasks(UpgradeJobsTask.class);
         _todo.clearTasks(LoadImmediateTask.class);
         _todo.clearTasks(CheckStaleNodes.class);
@@ -298,64 +255,77 @@
         _running = false;
     }
 
+    public void jobCompleted(String jobId) {
+        boolean deleted = false;
+        try {
+            deleted = _db.deleteJob(jobId, _nodeId);
+        } catch (DatabaseException de) {
+            String errmsg = "Database error.";
+            __log.error(errmsg, de);
+            throw new ContextException(errmsg, de);
+        }
+        
+        if (!deleted) {
+            try {
+                _txm.getTransaction().setRollbackOnly();
+            } catch (Exception ex) {
+                __log.error("Transaction manager error; setRollbackOnly() 
failed.", ex);
+            }
+            
+            throw new ContextException("Job no longer in database: jobId=" + 
jobId);
+        }
+    }
+
+
     /**
      * Run a job in the current thread.
-     *
+     * 
      * @param job
      *            job to run.
      */
     protected void runJob(final Job job) {
         final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, 
job.detail, 0);
 
-        _exec.submit(new Callable<Void>() {
-            public Void call() throws Exception {
-                if (job.transacted) {
-                    try {
-                        execTransaction(new Callable<Void>() {
-                            public Void call() throws Exception {
-                                _jobProcessor.onScheduledJob(jobInfo);
-                                if (job.persisted)
-                                    if (!_db.deleteJob(job.jobId, _nodeId))
-                                        throw new 
JobNoLongerInDbException(job.jobId,_nodeId);
-                                return null;
-                            }
-                        });
-                    } catch (JobNoLongerInDbException jde) {
-                        // This may happen if two node try to do the same 
job... we try to avoid
-                        // it the synchronization is a best-effort but not 
perfect.
-                        __log.debug("job no longer in db forced rollback.");
-                    } catch (Exception ex) {
-                        __log.error("Error while executing transaction", ex);
-                    }
-                } else {
-                    _jobProcessor.onScheduledJob(jobInfo);
-                }
-                return null;
-            }
-        });
+        try {
+            _jobProcessor.onScheduledJob(jobInfo);
+        } catch (Exception ex) {
+            __log.error("Error in scheduler processor.", ex);
+        }
+
     }
 
     private void addTodoOnCommit(final Job job) {
-        registerSynchronizer(new Synchronizer() {
 
-            public void afterCompletion(boolean success) {
-                if (success) {
-                    _todo.enqueue(job);
+        Transaction tx;
+        try {
+            tx = _txm.getTransaction();
+        } catch (Exception ex) {
+            String errmsg = "Transaction manager error; unable to obtain 
transaction.";
+            __log.error(errmsg, ex);
+            throw new ContextException(errmsg, ex);
+        }
+
+        if (tx == null)
+            throw new ContextException("Missing required transaction in thread 
" + Thread.currentThread());
+
+        try {
+            tx.registerSynchronization(new Synchronization() {
+
+                public void afterCompletion(int status) {
+                    if (status == Status.STATUS_COMMITTED) {
+                        _todo.enqueue(job);
+                    }
                 }
-            }
 
-            public void beforeCompletion() {
-            }
+                public void beforeCompletion() {
+                }
 
-        });
-    }
+            });
 
-    public boolean isTransacted() {
-        try {
-            Transaction tx = _txm.getTransaction();
-            return (tx != null && tx.getStatus() != 
Status.STATUS_NO_TRANSACTION);
-        } catch (SystemException e) {
-            throw new ContextException("Internal Error: Could not obtain 
transaction status.");
+        } catch (Exception e) {
+            String errmsg = "Unable to registrer synchronizer. ";
+            __log.error(errmsg, e);
+            throw new ContextException(errmsg, e);
         }
     }
 
@@ -441,6 +411,7 @@
 
     /**
      * Re-assign stale node's jobs to self.
+     * 
      * @param nodeId
      */
     void recoverStaleNode(final String nodeId) {
@@ -500,8 +471,9 @@
 
     /**
      * Upgrade jobs from far future to immediate future (basically, assign 
them to a node).
+     * 
      * @author mszefler
-     *
+     * 
      */
     private class UpgradeJobsTask extends SchedulerTask {
 
@@ -554,8 +526,6 @@
             }
         }
 
-
     }
-
 
 }


Reply via email to