Author: boisvert
Date: Sat Nov 15 09:02:04 2008
New Revision: 717879

URL: http://svn.apache.org/viewvc?rev=717879&view=rev
Log:
Apparently the Scheduler contract has changed from 1.x branch, reverting 
changeset 717872

Modified:
    
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
    
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
    
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
    
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
    
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
    
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
    
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
    
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java

Modified: 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/JdbcDelegate.java
 Sat Nov 15 09:02:04 2008
@@ -51,10 +51,10 @@
 
     private static final String UPDATE_REASSIGN = "update ODE_JOB set nodeid = 
?, scheduled = 0 where nodeid = ?";
 
-    private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set 
nodeid = ? where nodeid is null "
+    private static final String UPGRADE_JOB_DEFAULT = "update ODE_JOB set 
nodeid = ? where nodeid is null and scheduled = 0 "
             + "and mod(ts,?) = ? and ts < ?";
 
-    private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set 
nodeid = ? where nodeid is null "
+    private static final String UPGRADE_JOB_SQLSERVER = "update ODE_JOB set 
nodeid = ? where nodeid is null and scheduled = 0 "
         + "and (ts % ?) = ? and ts < ?";
 
     private static final String SAVE_JOB = "insert into ODE_JOB "
@@ -63,7 +63,7 @@
     private static final String GET_NODEIDS = "select distinct nodeid from 
ODE_JOB";
 
     private static final String SCHEDULE_IMMEDIATE = "select jobid, ts, 
transacted, scheduled, details from ODE_JOB "
-            + "where nodeid = ? and ts < ? order by ts";
+            + "where nodeid = ? and scheduled = 0 and ts < ? order by ts";
 
     private static final String UPDATE_SCHEDULED = "update ODE_JOB set 
scheduled = 1 where jobid in (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)";
 
@@ -179,6 +179,24 @@
             }
             rs.close();
             ps.close();
+            
+            // mark jobs as scheduled, UPDATE_SCHEDULED_SLOTS at a time
+            int j = 0;
+            int updateCount = 0;
+            ps = con.prepareStatement(UPDATE_SCHEDULED);
+            for (int updates = 1; updates <= (ret.size() / 
UPDATE_SCHEDULED_SLOTS) + 1; updates++) {
+                for (int i = 1; i <= UPDATE_SCHEDULED_SLOTS; i++) {
+                    ps.setString(i, j < ret.size() ? ret.get(j).jobId : "");
+                    j++;
+                }
+                ps.execute();
+                updateCount += ps.getUpdateCount();
+            }
+            if (updateCount != ret.size()) {
+                throw new DatabaseException(
+                        "Updating scheduled jobs failed to update all jobs; 
expected=" + ret.size()
+                                + " actual=" + updateCount);
+            }
         } catch (SQLException se) {
             throw new DatabaseException(se);
         } finally {
@@ -286,7 +304,7 @@
                     d = Dialect.SQLSERVER;
                 } else if (dbProductName.indexOf("MySQL") >= 0) {
                     d = Dialect.MYSQL;
-                } else if (dbProductName.indexOf("Sybase") >= 0 || 
dbProductName.indexOf("Adaptive") >= 0) {
+                } else if (dbProductName.indexOf("Sybase") >= 0) {
                     d = Dialect.SYBASE;
                 }
             }

Modified: 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/Job.java
 Sat Nov 15 09:02:04 2008
@@ -25,7 +25,7 @@
 
 /**
  * Like a task, but a little bit better.
- * 
+ *
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  */
 class Job extends Task {
@@ -37,7 +37,7 @@
     public Job(long when, boolean transacted, Map<String, Object> jobDetail) {
         this(when, new GUID().toString(),transacted,jobDetail);
     }
-    
+
     public Job(long when, String jobId, boolean transacted,Map<String, Object> 
jobDetail) {
         super(when);
         this.jobId = jobId;
@@ -54,9 +54,5 @@
     public boolean equals(Object obj) {
         return obj instanceof Job && jobId.equals(((Job) obj).jobId);
     }
-    
-    @Override
-    public String toString() {
-        return "Job "+jobId+" transacted: "+transacted+" persisted: 
"+persisted+" details: "+detail;
-    }
+
 }

Modified: 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SchedulerThread.java
 Sat Nov 15 09:02:04 2008
@@ -31,167 +31,168 @@
 
 /**
  * Implements the "todo" queue and prioritized scheduling mechanism. 
- * 
+ *
  * @author mszefler
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
- * 
+ *
  */
 class SchedulerThread implements Runnable {
 
-       private static final Log __log = 
LogFactory.getLog(SchedulerThread.class);
+    private static final Log __log = LogFactory.getLog(SchedulerThread.class);
 
-       private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
+    private static final int TODO_QUEUE_INITIAL_CAPACITY = 200;
 
-       /** Jobs ready for immediate execution. */
-       private PriorityBlockingQueue<Task> _todo;
+    /** Jobs ready for immediate execution. */
+    private PriorityBlockingQueue<Task> _todo;
 
-       /** Lock for managing the queue */
-       private ReentrantLock _lock = new ReentrantLock();
-
-       private Condition _activity = _lock.newCondition();
-
-       private volatile boolean _done;
-
-       private TaskRunner _taskrunner;
-
-       private Thread _thread;
-
-       SchedulerThread(TaskRunner runner) {
-               _todo = new 
PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
-                               new JobComparatorByDate());
-               _taskrunner = runner;
-       }
-
-       void start() {
-               if (_thread != null)
-                       return;
-
-               _done = false;
-               _thread = new Thread(this, "OdeScheduler");
-               _thread.start();
-       }
-
-       /**
-        * Shutdown the thread.
-        */
-       void stop() {
-               if (_thread == null)
-                       return;
-
-               _done = true;
-               _lock.lock();
-               try {
-                       _activity.signal();
-               } finally {
-                       _lock.unlock();
-
-               }
-
-               while (_thread != null)
-                       try {
-                               _thread.join();
-                               _thread = null;
-                       } catch (InterruptedException e) {
-                               ;
-                       }
-
-       }
-
-       /**
-        * Add a job to the todo queue.
-        * 
-        * @param job
-        */
-       void enqueue(Task task) {
-               _lock.lock();
-               try {
-                       _todo.add(task);
-                       _activity.signal();
-               } finally {
-                       _lock.unlock();
-               }
-       }
-
-       /**
-        * Remove a job to the todo queue.
-        *
-        * @param job
-        */
-       void dequeue(Task task) {
-               _lock.lock();
-               try {
-                       _todo.remove(task);
-                       _activity.signal();
-               } finally {
-                       _lock.unlock();
-               }
-       }
-
-       /**
-        * Get the size of the todo queue.
-        * 
-        * @return
-        */
-       public int size() {
-               return _todo.size();
-       }
-
-       /**
-        * Pop items off the todo queue, and send them to the task runner for 
processing.
-        */
-       public void run() {
-               while (!_done) {
-                       _lock.lock();
-                       try {
-                               long nextjob;
-                               while ((nextjob = nextJobTime()) > 0 && !_done)
-                                       _activity.await(nextjob, 
TimeUnit.MILLISECONDS);
-
-                               if (!_done && nextjob == 0) {
-                                       Task task = _todo.take();
-                                       _taskrunner.runTask(task);
-
-                               }
-                       } catch (InterruptedException ex) {
-                               ; // ignore
-                       } finally {
-                               _lock.unlock();
-                       }
-               }
-       }
-
-       /**
-        * Calculate the time until the next available job.
-        * 
-        * @return time until next job, 0 if one is one is scheduled to go, and 
some
-        *         really large number if there are no jobs to speak of
-        */
-       private long nextJobTime() {
-               assert _lock.isLocked();
-
-               Task job = _todo.peek();
-               if (job == null)
-                       return Long.MAX_VALUE;
-
-               return Math.max(0, job.schedDate - System.currentTimeMillis());
-       }
-
-       /**
-        * Remove the tasks of a given type from the list. 
-        * @param tasktype type of task
-        */
-       public void clearTasks(final Class<? extends Task> tasktype) {
-               _lock.lock();
-               try {
-                       CollectionsX.remove_if(_todo, new 
MemberOfFunction<Task>() {
-                               @Override
-                               public boolean isMember(Task o) {
-                                       return 
tasktype.isAssignableFrom(o.getClass());
-                               }
-
-                       });
-               } finally {
-                       _lock.unlock();
-               }
-       }
+    /** Lock for managing the queue */
+    private ReentrantLock _lock = new ReentrantLock();
+
+    private Condition _activity = _lock.newCondition();
+
+    private volatile boolean _done;
+
+    private TaskRunner _taskrunner;
+
+    private Thread _thread;
+
+    SchedulerThread(TaskRunner runner) {
+        _todo = new PriorityBlockingQueue<Task>(TODO_QUEUE_INITIAL_CAPACITY,
+                new JobComparatorByDate());
+        _taskrunner = runner;
+    }
+
+    void start() {
+        if (_thread != null)
+            return;
+
+        _done = false;
+        _thread = new Thread(this, "OdeScheduler");
+        _thread.start();
+    }
+
+    /**
+     * Shutdown the thread.
+     */
+    void stop() {
+        if (_thread == null)
+            return;
+
+        _done = true;
+        _lock.lock();
+        try {
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+
+        }
+
+        while (_thread != null)
+            try {
+                _thread.join();
+                _thread = null;
+            } catch (InterruptedException e) {
+                ;
+            }
+
+    }
+
+    /**
+     * Add a job to the todo queue.
+     *
+     * @param job
+     */
+    void enqueue(Task task) {
+        _lock.lock();
+        try {
+            _todo.add(task);
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+        }
+    }
+
+    /**
+     * Remove a job to the todo queue.
+     *
+     * @param job
+     */
+    void dequeue(Task task) {
+        _lock.lock();
+        try {
+            _todo.remove(task);
+            _activity.signal();
+        } finally {
+            _lock.unlock();
+        }
+    }
+
+
+    /**
+     * Get the size of the todo queue.
+     *
+     * @return
+     */
+    public int size() {
+        return _todo.size();
+    }
+
+    /**
+     * Pop items off the todo queue, and send them to the task runner for 
processing.
+     */
+    public void run() {
+        while (!_done) {
+            _lock.lock();
+            try {
+                long nextjob;
+                while ((nextjob = nextJobTime()) > 0 && !_done)
+                    _activity.await(nextjob, TimeUnit.MILLISECONDS);
+
+                if (!_done && nextjob == 0) {
+                    Task task = _todo.take();
+                    _taskrunner.runTask(task);
+
+                }
+            } catch (InterruptedException ex) {
+                ; // ignore
+            } finally {
+                _lock.unlock();
+            }
+        }
+    }
+
+    /**
+     * Calculate the time until the next available job.
+     *
+     * @return time until next job, 0 if one is one is scheduled to go, and 
some
+     *         really large number if there are no jobs to speak of
+     */
+    private long nextJobTime() {
+        assert _lock.isLocked();
+
+        Task job = _todo.peek();
+        if (job == null)
+            return Long.MAX_VALUE;
+
+        return Math.max(0, job.schedDate - System.currentTimeMillis());
+    }
+
+    /**
+     * Remove the tasks of a given type from the list.
+     * @param tasktype type of task
+     */
+    public void clearTasks(final Class<? extends Task> tasktype) {
+        _lock.lock();
+        try {
+            CollectionsX.remove_if(_todo, new MemberOfFunction<Task>() {
+                @Override
+                public boolean isMember(Task o) {
+                    return tasktype.isAssignableFrom(o.getClass());
+                }
+
+            });
+        } finally {
+            _lock.unlock();
+        }
+    }
 }

Modified: 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java
 Sat Nov 15 09:02:04 2008
@@ -23,14 +23,10 @@
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArraySet;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.transaction.Status;
 import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 
@@ -40,22 +36,19 @@
 import org.apache.ode.bpel.iapi.Scheduler;
 
 /**
- * A reliable and relatively simple scheduler that uses a database to persist 
information about 
- * scheduled tasks.
- * 
- * The challenge is to achieve high performance in a small memory footprint 
without loss of reliability
- * while supporting distributed/clustered configurations.
- * 
- * The design is based around three time horizons: "immediate", "near future", 
and "everything else". 
- * Immediate jobs (i.e. jobs that are about to be up) are written to the 
database and kept in
- * an in-memory priority queue. When they execute, they are removed from the 
database. Near future
- * jobs are placed in the database and assigned to the current node, however 
they are not stored in
- * memory. Periodically jobs are "upgraded" from near-future to immediate 
status, at which point they
- * get loaded into memory. Jobs that are further out in time, are placed in 
the database without a 
- * node identifer; when they are ready to be "upgraded" to near-future jobs 
they are assigned to one
- * of the known live nodes. Recovery is rather straighforward, with stale node 
identifiers being 
- * reassigned to known good nodes.       
- * 
+ * A reliable and relatively simple scheduler that uses a database to persist 
information about scheduled tasks.
+ *
+ * The challange is to achieve high performance in a small memory footprint 
without loss of reliability while supporting
+ * distributed/clustered configurations.
+ *
+ * The design is based around three time horizons: "immediate", "near future", 
and "everything else". Immediate jobs (i.e. jobs that
+ * are about to be up) are written to the database and kept in an in-memory 
priority queue. When they execute, they are removed from
+ * the database. Near future jobs are placed in the database and assigned to 
the current node, however they are not stored in
+ * memory. Periodically jobs are "upgraded" from near-future to immediate 
status, at which point they get loaded into memory. Jobs
+ * that are further out in time, are placed in the database without a node 
identifer; when they are ready to be "upgraded" to
+ * near-future jobs they are assigned to one of the known live nodes. Recovery 
is rather straighforward, with stale node identifiers
+ * being reassigned to known good nodes.
+ *
  * @author Maciej Szefler ( m s z e f l e r @ g m a i l . c o m )
  *
  */
@@ -69,7 +62,7 @@
     long _immediateInterval = 30000;
 
     /**
-     * Jobs scheduled with a time that is between 
(now+immediateInterval,now+nearFutureInterval) will be assigned to the current
+     * Jobs sccheduled with a time that is between 
(now+immediateInterval,now+nearFutureInterval) will be assigned to the current
      * node, but will not be placed on the todo queue (the promoter will pick 
them up).
      */
     long _nearFutureInterval = 10 * 60 * 1000;
@@ -77,17 +70,8 @@
     /** 10s of no communication and you are deemed dead. */
     long _staleInterval = 10000;
 
-    /**
-     * Estimated sustained transaction per second capacity of the system.
-     * e.g. 100 means the system can process 100 jobs per seconds, on average
-     * This number is used to determine how many jobs to load from the 
database at once.
-     */
-    int _tps = 100;
-
     TransactionManager _txm;
 
-    ExecutorService _exec;
-
     String _nodeId;
 
     /** Maximum number of jobs in the "near future" / todo queue. */
@@ -119,26 +103,10 @@
     public SimpleScheduler(String nodeId, DatabaseDelegate del, Properties 
conf) {
         _nodeId = nodeId;
         _db = del;
-        _todoLimit = getIntProperty(conf, "ode.scheduler.queueLength", 
_todoLimit);
-        _immediateInterval = getLongProperty(conf, 
"ode.scheduler.immediateInterval", _immediateInterval);
-        _nearFutureInterval = getLongProperty(conf, 
"ode.scheduler.nearFutureInterval", _nearFutureInterval);
-        _staleInterval = getLongProperty(conf, "ode.scheduler.staleInterval", 
_staleInterval);
-        _tps = getIntProperty(conf, "ode.scheduler.transactionsPerSecond", 
_tps);
+        _todoLimit = 
Integer.parseInt(conf.getProperty("ode.scheduler.queueLength", "10000"));
         _todo = new SchedulerThread(this);
     }
 
-    private int getIntProperty(Properties props, String propName, int 
defaultValue) {
-        String s = props.getProperty(propName);
-        if (s != null) return Integer.parseInt(s);
-        else return defaultValue;
-    }
-
-    private long getLongProperty(Properties props, String propName, long 
defaultValue) {
-        String s = props.getProperty(propName);
-        if (s != null) return Long.parseLong(s);
-        else return defaultValue;
-    }
-        
     public void setNodeId(String nodeId) {
         _nodeId = nodeId;
     }
@@ -155,10 +123,6 @@
         _nearFutureInterval = nearFutureInterval;
     }
 
-    public void setTransactionsPerSecond(int tps) {
-        _tps = tps;
-    }
-
     public void setTransactionManager(TransactionManager txm) {
         _txm = txm;
     }
@@ -167,10 +131,6 @@
         _db = dbd;
     }
 
-    public void setExecutorService(ExecutorService executorService) {
-        _exec = executorService;
-    }
-
     public void cancelJob(String jobId) throws ContextException {
         _todo.dequeue(new Job(0, jobId, false, null));
         try {
@@ -181,20 +141,6 @@
         }
     }
 
-    public <T> Future<T> execIsolatedTransaction(final Callable<T> 
transaction) throws Exception, ContextException {
-        return _exec.submit(new Callable<T>() {
-            public T call() throws Exception {
-                try {
-                    return execTransaction(transaction);
-                } catch (Exception e) {
-                    __log.error("An exception occured while executing an 
isolated transaction, " +
-                            "the transaction is going to be abandoned.", e);
-                    return null;
-                }
-            }
-        });
-    }
-
     public <T> T execTransaction(Callable<T> transaction) throws Exception, 
ContextException {
         try {
             if (__log.isDebugEnabled()) __log.debug("Beginning a new 
transaction");
@@ -221,24 +167,6 @@
         }
     }
 
-    public void registerSynchronizer(final Synchronizer synch) throws 
ContextException {
-        try {
-            _txm.getTransaction().registerSynchronization(new 
Synchronization() {
-
-                public void beforeCompletion() {
-                    synch.beforeCompletion();
-                }
-
-                public void afterCompletion(int status) {
-                    synch.afterCompletion(status == Status.STATUS_COMMITTED);
-                }
-
-            });
-        } catch (Exception e) {
-            throw new ContextException("Unable to register synchronizer.", e);
-        }
-    }
-
     public String schedulePersistedJob(final Map<String, Object> jobDetail, 
Date when) throws ContextException {
         long ctime = System.currentTimeMillis();
         if (when == null)
@@ -254,20 +182,25 @@
 
         try {
             if (immediate) {
+                // If we have too many jobs in the queue, we don't allow any 
new ones
+                if (_todo.size() > _todoLimit) {
+                    __log.error("The execution queue is backed up, the engine 
can't keep up with the load. Either " +
+                            "increase the queue size or regulate the flow.");
+                    return null;
+                }
+
                 // Immediate scheduling means we put it in the DB for safe 
keeping
                 _db.insertJob(job, _nodeId, true);
-                
                 // And add it to our todo list .
-                if (_todo.size() < _todoLimit) {
-                    addTodoOnCommit(job);
-                }
+                addTodoOnCommit(job);
+
                 __log.debug("scheduled immediate job: " + job.jobId);
             } else if (nearfuture) {
                 // Near future, assign the job to ourselves (why? -- this 
makes it very unlikely that we
                 // would get two nodes trying to process the same instance, 
which causes unsightly rollbacks).
                 _db.insertJob(job, _nodeId, false);
                 __log.debug("scheduled near-future job: " + job.jobId);
-            } else /* far future */ {
+            } else /* far future */{
                 // Not the near future, we don't assign a node-id, we'll 
assign it later.
                 _db.insertJob(job, null, false);
                 __log.debug("scheduled far-future job: " + job.jobId);
@@ -302,9 +235,6 @@
         if (_running)
             return;
 
-        if (_exec == null)
-            _exec = Executors.newCachedThreadPool();
-
         _todo.clearTasks(UpgradeJobsTask.class);
         _todo.clearTasks(LoadImmediateTask.class);
         _todo.clearTasks(CheckStaleNodes.class);
@@ -325,28 +255,23 @@
             throw new ContextException("Error retrieving node list.", ex);
         }
 
-        long now = System.currentTimeMillis();
-        
         // Pretend we got a heartbeat...
-        for (String s : _knownNodes) _lastHeartBeat.put(s, now);
+        for (String s : _knownNodes)
+            _lastHeartBeat.put(s, System.currentTimeMillis());
 
         // schedule immediate job loading for now!
-        _todo.enqueue(new LoadImmediateTask(now));
+        _todo.enqueue(new LoadImmediateTask(System.currentTimeMillis()));
 
         // schedule check for stale nodes, make it random so that the nodes 
don't overlap.
-        _todo.enqueue(new CheckStaleNodes(now + randomMean(_staleInterval)));
+        _todo.enqueue(new CheckStaleNodes(System.currentTimeMillis() + (long) 
(_random.nextDouble() * _staleInterval)));
 
         // do the upgrade sometime (random) in the immediate interval.
-        _todo.enqueue(new UpgradeJobsTask(now + 
randomMean(_immediateInterval)));
+        _todo.enqueue(new UpgradeJobsTask(System.currentTimeMillis() + (long) 
(_random.nextDouble() * _immediateInterval)));
 
         _todo.start();
         _running = true;
     }
-    
-    private long randomMean(long mean) {
-        return (long) _random.nextDouble() * mean + (mean/2);
-    }
-        
+
     public synchronized void stop() {
         if (!_running)
             return;
@@ -358,6 +283,28 @@
         _running = false;
     }
 
+    public void jobCompleted(String jobId) {
+        boolean deleted = false;
+        try {
+            deleted = _db.deleteJob(jobId, _nodeId);
+        } catch (DatabaseException de) {
+            String errmsg = "Database error.";
+            __log.error(errmsg, de);
+            throw new ContextException(errmsg, de);
+        }
+
+        if (!deleted) {
+            try {
+                _txm.getTransaction().setRollbackOnly();
+            } catch (Exception ex) {
+                __log.error("Transaction manager error; setRollbackOnly() 
failed.", ex);
+            }
+
+            throw new ContextException("Job no longer in database: jobId=" + 
jobId);
+        }
+    }
+
+
     /**
      * Run a job in the current thread.
      *
@@ -368,70 +315,53 @@
         final Scheduler.JobInfo jobInfo = new Scheduler.JobInfo(job.jobId, 
job.detail,
                 (Integer)(job.detail.get("retry") != null ? 
job.detail.get("retry") : 0));
 
-        _exec.submit(new Callable<Void>() {
-            public Void call() throws Exception {
-                if (job.transacted) {
-                    try {
-                        execTransaction(new Callable<Void>() {
-                            public Void call() throws Exception {
-                                if (job.persisted)
-                                    if (!_db.deleteJob(job.jobId, _nodeId))
-                                        throw new 
JobNoLongerInDbException(job.jobId,_nodeId);
-                                    
-                                try {
-                                    _jobProcessor.onScheduledJob(jobInfo);
-                                } catch (JobProcessorException jpe) {
-                                    if (jpe.retry) {
-                                        int retry = job.detail.get("retry") != 
null ? (((Integer)job.detail.get("retry")) + 1) : 0;
-                                        if (retry <= 10) {
-                                            long delay = doRetry(job);
-                                            __log.error("Error while 
processing transaction, retrying in " + delay + "s");
-                                        } else {
-                                            __log.error("Error while 
processing transaction after 10 retries, no more retries:"+job);
-                                        }
-                                    } else {
-                                        __log.error("Error while processing 
transaction, no retry.", jpe);
-                                    }
-                                }
-                                return null;
-                            }
-                        });
-                    } catch (JobNoLongerInDbException jde) {
-                        // This may happen if two node try to do the same 
job... we try to avoid
-                        // it the synchronization is a best-effort but not 
perfect.
-                        __log.debug("job no longer in db forced rollback.");
-                    } catch (Exception ex) {
-                        __log.error("Error while executing transaction", ex);
-                    }
-                } else {
-                    _jobProcessor.onScheduledJob(jobInfo);
-                }
-                return null;
+        try {
+            try {
+                _jobProcessor.onScheduledJob(jobInfo);
+            } catch (JobProcessorException jpe) {
+                if (jpe.retry)
+                    __log.error("Error while processing transaction, retrying 
in " + doRetry(job) + "s");
+                else
+                    __log.error("Error while processing transaction, no 
retry.", jpe);
             }
-        });
+        } catch (Exception ex) {
+            __log.error("Error in scheduler processor.", ex);
+        }
+
     }
 
     private void addTodoOnCommit(final Job job) {
-        registerSynchronizer(new Synchronizer() {
 
-            public void afterCompletion(boolean success) {
-                if (success) {
-                    _todo.enqueue(job);
+        Transaction tx;
+        try {
+            tx = _txm.getTransaction();
+        } catch (Exception ex) {
+            String errmsg = "Transaction manager error; unable to obtain 
transaction.";
+            __log.error(errmsg, ex);
+            throw new ContextException(errmsg, ex);
+        }
+
+        if (tx == null)
+            throw new ContextException("Missing required transaction in thread 
" + Thread.currentThread());
+
+        try {
+            tx.registerSynchronization(new Synchronization() {
+
+                public void afterCompletion(int status) {
+                    if (status == Status.STATUS_COMMITTED) {
+                        _todo.enqueue(job);
+                    }
                 }
-            }
 
-            public void beforeCompletion() {
-            }
+                public void beforeCompletion() {
+                }
 
-        });
-    }
+            });
 
-    public boolean isTransacted() {
-        try {
-            Transaction tx = _txm.getTransaction();
-            return (tx != null && tx.getStatus() != 
Status.STATUS_NO_TRANSACTION);
-        } catch (SystemException e) {
-            throw new ContextException("Internal Error: Could not obtain 
transaction status.");
+        } catch (Exception e) {
+            String errmsg = "Unable to registrer synchronizer. ";
+            __log.error(errmsg, e);
+            throw new ContextException(errmsg, e);
         }
     }
 
@@ -455,25 +385,21 @@
 
     boolean doLoadImmediate() {
         __log.debug("LOAD IMMEDIATE started");
-        
-        // don't load anything if we're already half-full;  we've got plenty 
to do already
-        if (_todo.size() > _todoLimit/2) return true;
-        
         List<Job> jobs;
         try {
-            final int batch = (int) (_immediateInterval * _tps / 1000);
-            jobs = execTransaction(new Callable<List<Job>>() {
-                public List<Job> call() throws Exception {
-                    return _db.dequeueImmediate(_nodeId, 
System.currentTimeMillis() + _immediateInterval, batch);
-                }
-            });
-            for (Job j : jobs) {
-                if (__log.isDebugEnabled())
-                    __log.debug("todo.enqueue job from db: " + j.jobId + " for 
" + j.schedDate);
+            do {
+                jobs = execTransaction(new Callable<List<Job>>() {
+                    public List<Job> call() throws Exception {
+                        return _db.dequeueImmediate(_nodeId, 
System.currentTimeMillis() + _immediateInterval, 10);
+                    }
+                });
+                for (Job j : jobs) {
+                    if (__log.isDebugEnabled())
+                        __log.debug("todo.enqueue job from db: " + j.jobId + " 
for " + j.schedDate);
 
-                if (_todo.size() < _todoLimit) 
                     _todo.enqueue(j);
-            }
+                }
+            } while (jobs.size() == 10);
             return true;
         } catch (Exception ex) {
             __log.error("Error loading immediate jobs from database.", ex);
@@ -521,6 +447,7 @@
 
     /**
      * Re-assign stale node's jobs to self.
+     *
      * @param nodeId
      */
     void recoverStaleNode(final String nodeId) {
@@ -549,7 +476,6 @@
         } finally {
             __log.debug("node recovery complete");
         }
-
     }
 
     private long doRetry(Job job) throws DatabaseException {
@@ -581,9 +507,9 @@
                 success = doLoadImmediate();
             } finally {
                 if (success)
-                    _todo.enqueue(new 
LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * 
.90)));
+                    _todo.enqueue(new 
LoadImmediateTask(System.currentTimeMillis() + (long) (_immediateInterval * 
.75)));
                 else
-                    _todo.enqueue(new 
LoadImmediateTask(System.currentTimeMillis() + 1000));
+                    _todo.enqueue(new 
LoadImmediateTask(System.currentTimeMillis() + 100));
             }
         }
 
@@ -591,6 +517,7 @@
 
     /**
      * Upgrade jobs from far future to immediate future (basically, assign 
them to a node).
+     *
      * @author mszefler
      *
      */
@@ -617,7 +544,7 @@
             try {
                 success = doUpgrade();
             } finally {
-                long future = System.currentTimeMillis() + (success ? (long) 
(_nearFutureInterval * .50) : 1000);
+                long future = System.currentTimeMillis() + (success ? (long) 
(_nearFutureInterval * .50) : 100);
                 _nextUpgrade.set(future);
                 _todo.enqueue(new UpgradeJobsTask(future));
                 __log.debug("UPGRADE completed, success = " + success + "; 
next time in " + (future - ctime) + "ms");
@@ -640,16 +567,11 @@
             __log.debug("CHECK STALE NODES started");
             for (String nodeId : _knownNodes) {
                 Long lastSeen = _lastHeartBeat.get(nodeId);
-                if ((lastSeen == null || (System.currentTimeMillis() - 
lastSeen) > _staleInterval)
-                    && !_nodeId.equals(nodeId))
-                {
+                if (lastSeen == null || (System.currentTimeMillis() - 
lastSeen) > _staleInterval)
                     recoverStaleNode(nodeId);
-                }
             }
         }
 
-
     }
 
-
 }

Modified: 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/JdbcDelegateTest.java
 Sat Nov 15 09:02:04 2008
@@ -43,7 +43,7 @@
         _ds = new DelegateSupport();
         _del = _ds.delegate();
     }
-    
+     
     
     public void testGetNodeIds() throws Exception {
         // should have no node ids in the db, empty list (not null)
@@ -91,9 +91,8 @@
         assertEquals("j1",jobs.get(0).jobId);
         jobs = _del.dequeueImmediate("n1", 250L, 1000);
         assertNotNull(jobs);
-        assertEquals(2, jobs.size());
-        assertEquals("j1",jobs.get(0).jobId);
-        assertEquals("j2",jobs.get(1).jobId);
+        assertEquals(1, jobs.size());
+        assertEquals("j2",jobs.get(0).jobId);
     }
     
     public void testScheduleImmediateMaxRows() throws Exception {
@@ -104,6 +103,10 @@
         assertNotNull(jobs);
         assertEquals(1, jobs.size());
         assertEquals("j1",jobs.get(0).jobId);
+        jobs = _del.dequeueImmediate("n1", 250L, 1000);
+        assertNotNull(jobs);
+        assertEquals(1, jobs.size());
+        assertEquals("j2",jobs.get(0).jobId);
     }
 
     public void testScheduleImmediateNodeFilter() throws Exception {

Modified: 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/RetriesTest.java
 Sat Nov 15 09:02:04 2008
@@ -51,7 +51,7 @@
 
     public void onScheduledJob(Scheduler.JobInfo jobInfo) throws 
Scheduler.JobProcessorException {
         _tried++;
-        throw new Scheduler.JobProcessorException(jobInfo.retryCount < 3);
+        throw new Scheduler.JobProcessorException(jobInfo.retryCount < 2);
     }
 
     Map<String, Object> newDetail(String x) {

Modified: 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SchedulerThreadTest.java
 Sat Nov 15 09:02:04 2008
@@ -52,7 +52,7 @@
         _st.start();
         long schedtime = System.currentTimeMillis() + 300;
         _st.enqueue(new Task(schedtime));
-        Thread.sleep(1000);
+        Thread.sleep(600);
         assertEquals(1,_tasks.size());
         assertTrue(_tasks.get(0).time < schedtime + SCHED_TOLERANCE / 2);
         assertTrue(_tasks.get(0).time > schedtime - SCHED_TOLERANCE / 2);

Modified: 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
URL: 
http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java?rev=717879&r1=717878&r2=717879&view=diff
==============================================================================
--- 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
 (original)
+++ 
ode/trunk/scheduler-simple/src/test/java/org/apache/ode/scheduler/simple/SimpleSchedulerTest.java
 Sat Nov 15 09:02:04 2008
@@ -21,34 +21,29 @@
 
 import java.util.*;
 
-import javax.transaction.RollbackException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
 import javax.transaction.TransactionManager;
 
 import junit.framework.TestCase;
 
+import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 import org.apache.ode.bpel.iapi.Scheduler.JobInfo;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessor;
 import org.apache.ode.bpel.iapi.Scheduler.JobProcessorException;
-import org.apache.geronimo.transaction.manager.GeronimoTransactionManager;
 
 public class SimpleSchedulerTest extends TestCase implements JobProcessor {
 
     DelegateSupport _ds;
     SimpleScheduler _scheduler;
     ArrayList<JobInfo> _jobs;
-    ArrayList<JobInfo> _commit;
     TransactionManager _txm;
 
+
     public void setUp() throws Exception {
         _txm = new GeronimoTransactionManager();
         _ds = new DelegateSupport();
 
         _scheduler = newScheduler("n1");
         _jobs = new ArrayList<JobInfo>(100);
-        _commit = new ArrayList<JobInfo>(100);
     }
 
     public void tearDown() throws Exception {
@@ -57,25 +52,21 @@
 
     public void testConcurrentExec() throws Exception  {
         _scheduler.start();
-        for (int i=0; i<10; i++) {
-            _txm.begin();
-            String jobId;
-            try {
-                int jobs = _jobs.size();
-                jobId = _scheduler.schedulePersistedJob(newDetail("123"), new 
Date(System.currentTimeMillis() + 200));
-                Thread.sleep(100);
-                // Make sure we don't schedule until commit.
-                assertEquals(jobs, _jobs.size());
-            } finally {
-                _txm.commit();
-            }
-            // Delete from DB
-            assertEquals(true,_ds.delegate().deleteJob(jobId, "n1"));
-            // Wait for the job to be execed.
-            Thread.sleep(250);
-            // We should always have same number of jobs/commits
-            assertEquals(_jobs.size(), _commit.size());
+        _txm.begin();
+        String jobId;
+        try {
+            jobId = _scheduler.schedulePersistedJob(newDetail("123"), new 
Date(System.currentTimeMillis() + 100));
+            Thread.sleep(200);
+            // Make sure we don't schedule until commit.
+            assertEquals(0, _jobs.size());
+        } finally {
+            _txm.commit();
         }
+        // Wait for the job to be execed.
+        Thread.sleep(100);
+        // Should execute job,
+        assertEquals(1, _jobs.size());
+
     }
     
     public void testImmediateScheduling() throws Exception {
@@ -116,124 +107,102 @@
 
     public void testNearFutureScheduling() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(10000);
-        _scheduler.setImmediateInterval(5000);
+        _scheduler.setNearFutureInterval(1000);
+        _scheduler.setImmediateInterval(500);
         _scheduler.start();
 
         _txm.begin();
         try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new 
Date(System.currentTimeMillis() + 7500));
+            _scheduler.schedulePersistedJob(newDetail("123"), new 
Date(System.currentTimeMillis() + 750));
         } finally {
             _txm.commit();
         }
 
-        Thread.sleep(8500);
+        Thread.sleep(850);
         assertEquals(1, _jobs.size());
     }
 
     public void testFarFutureScheduling() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(7000);
-        _scheduler.setImmediateInterval(3000);
+        _scheduler.setNearFutureInterval(700);
+        _scheduler.setImmediateInterval(300);
         _scheduler.start();
 
         _txm.begin();
         try {
-            _scheduler.schedulePersistedJob(newDetail("123"), new 
Date(System.currentTimeMillis() + 7500));
+            _scheduler.schedulePersistedJob(newDetail("123"), new 
Date(System.currentTimeMillis() + 750));
         } finally {
             _txm.commit();
         }
 
-        Thread.sleep(8500);
+        Thread.sleep(850);
         assertEquals(1, _jobs.size());
     }
 
     public void testRecovery() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(500);
+        _scheduler.setNearFutureInterval(200);
+        _scheduler.setImmediateInterval(100);
+        _scheduler.setStaleInterval(50);
 
         _txm.begin();
         try {
             _scheduler.schedulePersistedJob(newDetail("immediate"), new 
Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new 
Date(System.currentTimeMillis() + 1100));
-            _scheduler.schedulePersistedJob(newDetail("far"), new 
Date(System.currentTimeMillis() + 2500));
+            _scheduler.schedulePersistedJob(newDetail("near"), new 
Date(System.currentTimeMillis() + 110));
+            _scheduler.schedulePersistedJob(newDetail("far"), new 
Date(System.currentTimeMillis() + 250));
         } finally {
             _txm.commit();
         }
 
         _scheduler = newScheduler("n3");
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(1000);
+        _scheduler.setNearFutureInterval(200);
+        _scheduler.setImmediateInterval(100);
+        _scheduler.setStaleInterval(50);
         _scheduler.start();
-        Thread.sleep(4000);
+        Thread.sleep(400);
         assertEquals(3, _jobs.size());
     }
 
     public void testRecoverySuppressed() throws Exception {
         // speed things up a bit to hit the right code paths
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(500);
+        _scheduler.setNearFutureInterval(200);
+        _scheduler.setImmediateInterval(100);
+        _scheduler.setStaleInterval(50);
 
+        // schedule some jobs ...
         _txm.begin();
         try {
             _scheduler.schedulePersistedJob(newDetail("immediate"), new 
Date(System.currentTimeMillis()));
-            _scheduler.schedulePersistedJob(newDetail("near"), new 
Date(System.currentTimeMillis() + 1100));
-            _scheduler.schedulePersistedJob(newDetail("far"), new 
Date(System.currentTimeMillis() + 15000));
+            _scheduler.schedulePersistedJob(newDetail("near"), new 
Date(System.currentTimeMillis() + 150));
+            _scheduler.schedulePersistedJob(newDetail("far"), new 
Date(System.currentTimeMillis() + 250));
         } finally {
             _txm.commit();
-        }
-        _scheduler.stop();
+        } 
 
-        _scheduler = newScheduler("n3");
-        _scheduler.setNearFutureInterval(2000);
-        _scheduler.setImmediateInterval(1000);
-        _scheduler.setStaleInterval(1000);
-        _scheduler.start();
+        // but don't start the scheduler.... 
+        
+        // create a second node for the scheduler. 
+        SimpleScheduler scheduler = newScheduler("n3");
+        scheduler.setNearFutureInterval(200);
+        scheduler.setImmediateInterval(100);
+        scheduler.setStaleInterval(50);
+        scheduler.start();
         for (int i = 0; i < 40; ++i) {
-            _scheduler.updateHeartBeat("n1");
-            Thread.sleep(100);
+            scheduler.updateHeartBeat("n1");
+            Thread.sleep(10);
         }
 
-        _scheduler.stop();
-        Thread.sleep(1000);
+        scheduler.stop();
 
-        assertEquals(0, _jobs.size());
+        assertTrue(_jobs.size() <= 1);
+        if (_jobs.size() == 1)
+            assertEquals("far", _jobs.get(0).jobDetail.get("foo"));
     }
 
     public void onScheduledJob(final JobInfo jobInfo) throws 
JobProcessorException {
         synchronized (_jobs) {
             _jobs.add(jobInfo);
         }
-        
-        try {
-            _txm.getTransaction().registerSynchronization(new 
Synchronization() {
-
-                public void afterCompletion(int arg0) {
-                    if (arg0 == Status.STATUS_COMMITTED) 
-                        _commit.add(jobInfo);
-                }
-
-                public void beforeCompletion() {
-                    // TODO Auto-generated method stub
-                    
-                }
-                
-            });
-        } catch (IllegalStateException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (RollbackException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        } catch (SystemException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-        }
-
     }
 
     Map<String, Object> newDetail(String x) {


Reply via email to