Author: agilliland
Date: Thu Jul 26 11:04:12 2007
New Revision: 559911

URL: http://svn.apache.org/viewvc?view=rev&rev=559911
Log:
first pass at fixing up task running issues (ROL-1294, ROL-1446).  This commit 
introduces a couple significant changes to the way scheduled tasks are handled 
...

1. Tasks are no longer scheduled using a ScheduledExecutorService in the thread 
manager impl.  We have replaced that functionality by implementing a custom 
TaskScheduler class with runs on its own thread and executes the scheduled 
tasks.  This was done to work around some inconsistencies with using 
ScheduledExecutorService as well as to implement some behavior specific to the 
way Weblogger tasks are run.

2. We have corrected for an issue with tasks missing appropriate execution 
times due to time drifting and clock mismatch issues created by the leasing 
process.  The has been fixed by updating the leasing logic so that it no longer 
has any affiliation with trying to prevent tasks from running before their set 
interval time and by using the custom scheduler in #1 to better synchronize the 
scheduled runtimes for tasks.

Added:
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TaskScheduler.java
Modified:
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/hibernate/HibernateThreadManagerImpl.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/jpa/JPAThreadManagerImpl.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/pings/PingQueueTask.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ResetHitCountsTask.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTask.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTaskWithLeasing.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ScheduledEntriesTask.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManager.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManagerImpl.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TurnoverReferersTask.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/RefreshRollerPlanetTask.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/SyncWebsitesTask.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.hbm.xml
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.java
    
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.orm.xml
    
roller/trunk/apps/weblogger/test/java/org/apache/roller/weblogger/business/TaskLockTest.java

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/hibernate/HibernateThreadManagerImpl.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/hibernate/HibernateThreadManagerImpl.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/hibernate/HibernateThreadManagerImpl.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/hibernate/HibernateThreadManagerImpl.java
 Thu Jul 26 11:04:12 2007
@@ -21,6 +21,7 @@
 import java.util.Date;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.roller.util.DateUtil;
 import org.apache.roller.weblogger.WebloggerException;
 import org.apache.roller.weblogger.business.Weblogger;
 import org.apache.roller.weblogger.business.runnable.ThreadManagerImpl;
@@ -59,16 +60,24 @@
     
     
     /**
-     * Try to aquire a lock for a given RollerTask.
+     * Try to aquire a lease for a given RollerTask.
      */
+    @Override
     public boolean registerLease(RollerTask task) {
         
+        log.debug("Attempting to register lease for task - "+task.getName());
+        
+        // keep a copy of the current time
+        Date currentTime = new Date();
+        
         // query for existing lease record first
         TaskLock taskLock = null;
         try {
             taskLock = this.getTaskLockByName(task.getName());
             
             if(taskLock == null) {
+                log.debug("Task record does not exist, inserting empty record 
to start with");
+                
                 // insert an empty record, then we will actually acquire the
                 // lease below using an update statement 
                 taskLock = new TaskLock();
@@ -89,25 +98,44 @@
         // try to acquire lease
         try {
             // calculate lease expiration time
-            // expireTime = startTime + (timeLeased * 60sec/min) - 1 sec
-            // we remove 1 second to adjust for precision differences
-            long leaseExpireTime = taskLock.getTimeAquired().getTime()+
-                    (60000*taskLock.getTimeLeased())-1000;
+            Date leaseExpiration = taskLock.getLeaseExpiration();
+            
+            // calculate run time for task, this is expected time, not actual 
time
+            // i.e. if a task is meant to run daily at midnight this should
+            // reflect 00:00:00 on the current day
+            Date runTime = currentTime;
+            if("startOfDay".equals(task.getStartTimeDesc())) {
+                // start of today
+                runTime = DateUtil.getStartOfDay(currentTime);
+            } else if("startOfHour".equals(task.getStartTimeDesc())) {
+                // start of this hour
+                runTime = DateUtil.getStartOfHour(currentTime);
+            } else {
+                // start of this minute
+                runTime = DateUtil.getStartOfMinute(currentTime);
+            }
+            
+            if(log.isDebugEnabled()) {
+                log.debug("last run = "+taskLock.getLastRun());
+                log.debug("new run time = "+runTime);
+                log.debug("last acquired = "+taskLock.getTimeAquired());
+                log.debug("time leased = "+taskLock.getTimeLeased());
+                log.debug("lease expiration = "+leaseExpiration);
+            }
             
-            Session session = 
((HibernatePersistenceStrategy)this.strategy).getSession();
+            Session session = strategy.getSession();
             String queryHQL = "update TaskLock "+
-                    "set client=:client, timeacquired=current_timestamp(), 
timeleased=:timeleased "+
-                    "where name=:name and timeacquired=:timeacquired "+
-                    "and :leaseends < current_timestamp()";
+                    "set client=:client, timeacquired=current_timestamp(), 
timeleased=:timeleased, lastrun=:runTime "+
+                    "where name=:name and timeacquired=:timeacquired and 
current_timestamp() > :leaseends";
             Query query = session.createQuery(queryHQL);
             query.setString("client", task.getClientId());
             query.setInteger("timeleased", task.getLeaseTime());
+            query.setTimestamp("runTime", runTime);
             query.setString("name", task.getName());
             query.setTimestamp("timeacquired", taskLock.getTimeAquired());
-            query.setTimestamp("leaseends", new Date(leaseExpireTime));
+            query.setTimestamp("leaseends", leaseExpiration);
             int result = query.executeUpdate();
             
-            // this may not be needed
             roller.flush();
             
             if(result == 1) {
@@ -126,6 +154,7 @@
     /**
      * Try to release the lock for a given RollerTask.
      */
+    @Override
     public boolean unregisterLease(RollerTask task) {
         
         // query for existing lease record first
@@ -144,11 +173,11 @@
         
         // try to release lease, just set lease time to 0
         try {
-            Session session = 
((HibernatePersistenceStrategy)this.strategy).getSession();
+            Session session = strategy.getSession();
             String queryHQL = "update TaskLock set timeLeased=:interval "+
                     "where name=:name and client=:client";
             Query query = session.createQuery(queryHQL);
-            query.setInteger("interval", task.getInterval());
+            query.setInteger("interval", 0);
             query.setString("name", task.getName());
             query.setString("client", task.getClientId());
             int result = query.executeUpdate();
@@ -169,11 +198,14 @@
     }
     
     
-    private TaskLock getTaskLockByName(String name) throws WebloggerException {
+    /**
+     * @inheritDoc
+     */
+    public TaskLock getTaskLockByName(String name) throws WebloggerException {
         
         // do lookup
         try {
-            Session session = 
((HibernatePersistenceStrategy)this.strategy).getSession();
+            Session session = strategy.getSession();
             Criteria criteria = session.createCriteria(TaskLock.class);
             
             criteria.add(Expression.eq("name", name));
@@ -186,7 +218,10 @@
     }
     
     
-    private void saveTaskLock(TaskLock data) throws WebloggerException {
+    /**
+     * @inheritDoc
+     */
+    public void saveTaskLock(TaskLock data) throws WebloggerException {
         this.strategy.store(data);
     }
     

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/jpa/JPAThreadManagerImpl.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/jpa/JPAThreadManagerImpl.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/jpa/JPAThreadManagerImpl.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/jpa/JPAThreadManagerImpl.java
 Thu Jul 26 11:04:12 2007
@@ -24,7 +24,7 @@
 import javax.persistence.Query;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.roller.weblogger.business.jpa.JPAPersistenceStrategy;
+import org.apache.roller.util.DateUtil;
 import org.apache.roller.weblogger.WebloggerException;
 import org.apache.roller.weblogger.business.Weblogger;
 import org.apache.roller.weblogger.business.runnable.ThreadManagerImpl;
@@ -61,13 +61,22 @@
     /**
      * Try to aquire a lock for a given RollerTask.
      */
+    @Override
     public boolean registerLease(RollerTask task) {
+        
+        log.debug("Attempting to register lease for task - "+task.getName());
+        
+        // keep a copy of the current time
+        Date currentTime = new Date();
+        
         // query for existing lease record first
         TaskLock taskLock = null;
         try {
             taskLock = this.getTaskLockByName(task.getName());
 
             if(taskLock == null) {
+                log.debug("Task record does not exist, inserting empty record 
to start with");
+                
                 // insert an empty record, then we will actually acquire the
                 // lease below using an update statement
                 taskLock = new TaskLock();
@@ -88,18 +97,39 @@
         // try to acquire lease
         try {
             // calculate lease expiration time
-            // expireTime = startTime + (timeLeased * 60sec/min) - 1 sec
-            // we remove 1 second to adjust for precision differences
-            long leaseExpireTime = taskLock.getTimeAquired().getTime()+
-                    (60000*taskLock.getTimeLeased())-1000;
+            Date leaseExpiration = taskLock.getLeaseExpiration();
+            
+            // calculate run time for task, this is expected time, not actual 
time
+            // i.e. if a task is meant to run daily at midnight this should
+            // reflect 00:00:00 on the current day
+            Date runTime = currentTime;
+            if("startOfDay".equals(task.getStartTimeDesc())) {
+                // start of today
+                runTime = DateUtil.getStartOfDay(currentTime);
+            } else if("startOfHour".equals(task.getStartTimeDesc())) {
+                // start of this hour
+                runTime = DateUtil.getStartOfHour(currentTime);
+            } else {
+                // start of this minute
+                runTime = DateUtil.getStartOfMinute(currentTime);
+            }
+            
+            if(log.isDebugEnabled()) {
+                log.debug("last run = "+taskLock.getLastRun());
+                log.debug("new run time = "+runTime);
+                log.debug("last acquired = "+taskLock.getTimeAquired());
+                log.debug("time leased = "+taskLock.getTimeLeased());
+                log.debug("lease expiration = "+leaseExpiration);
+            }
 
             Query q = strategy.getNamedUpdate(
-                    
"TaskLock.updateClient&Timeacquired&TimeleasedByName&Timeacquired");
+                    
"TaskLock.updateClient&Timeacquired&Timeleased&LastRunByName&Timeacquired");
             q.setParameter(1, task.getClientId());
             q.setParameter(2, Integer.valueOf(task.getLeaseTime()));
-            q.setParameter(3, task.getName());
-            q.setParameter(4, taskLock.getTimeAquired());
-            q.setParameter(5, new Timestamp(leaseExpireTime));
+            q.setParameter(3, new Timestamp(runTime.getTime()));
+            q.setParameter(4, task.getName());
+            q.setParameter(5, taskLock.getTimeAquired());
+            q.setParameter(6, new Timestamp(leaseExpiration.getTime()));
             int result = q.executeUpdate();
             
             if(result == 1) {
@@ -118,6 +148,7 @@
     /**
      * Try to release the lock for a given RollerTask.
      */
+    @Override
     public boolean unregisterLease(RollerTask task) {
 
         // query for existing lease record first
@@ -138,7 +169,7 @@
         try {
             Query q = strategy.getNamedUpdate(
                     "TaskLock.updateTimeLeasedByName&Client");
-            q.setParameter(1, Integer.valueOf(task.getInterval()));
+            q.setParameter(1, Integer.valueOf(0));
             q.setParameter(2, task.getName());
             q.setParameter(3, task.getClientId());
             int result = q.executeUpdate();
@@ -156,8 +187,11 @@
 
     }
     
-
-    private TaskLock getTaskLockByName(String name) throws WebloggerException {
+    
+    /**
+     * @inheritDoc
+     */
+    public TaskLock getTaskLockByName(String name) throws WebloggerException {
         // do lookup
         Query q = strategy.getNamedQuery("TaskLock.getByName");
         q.setParameter(1, name);
@@ -169,7 +203,10 @@
     }
 
     
-    private void saveTaskLock(TaskLock data) throws WebloggerException {
+    /**
+     * @inheritDoc
+     */
+    public void saveTaskLock(TaskLock data) throws WebloggerException {
         this.strategy.store(data);
     }
 

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/pings/PingQueueTask.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/pings/PingQueueTask.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/pings/PingQueueTask.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/pings/PingQueueTask.java
 Thu Jul 26 11:04:12 2007
@@ -67,6 +67,10 @@
         return getAdjustedTime(currentTime, startTimeDesc);
     }
     
+    public String getStartTimeDesc() {
+        return startTimeDesc;
+    }
+    
     public int getInterval() {
         return this.interval;
     }

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ResetHitCountsTask.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ResetHitCountsTask.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ResetHitCountsTask.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ResetHitCountsTask.java
 Thu Jul 26 11:04:12 2007
@@ -60,6 +60,10 @@
         return getAdjustedTime(currentTime, startTimeDesc);
     }
     
+    public String getStartTimeDesc() {
+        return startTimeDesc;
+    }
+    
     public int getInterval() {
         return this.interval;
     }

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTask.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTask.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTask.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTask.java
 Thu Jul 26 11:04:12 2007
@@ -81,6 +81,16 @@
     
     
     /**
+     * Get a string description of the start time of the given task.
+     * 
+     * Should be one of ... 'immediate', 'startOfDay', 'startOfHour'
+     * 
+     * @return The start time description.
+     */
+    public abstract String getStartTimeDesc();
+    
+    
+    /**
      * How often should the task run, in seconds.
      *
      * example: 3600 means this task runs once every hour.

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTaskWithLeasing.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTaskWithLeasing.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTaskWithLeasing.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/RollerTaskWithLeasing.java
 Thu Jul 26 11:04:12 2007
@@ -55,33 +55,33 @@
         
         boolean lockAcquired = false;
         try {
-            log.debug("Attempting to acquire lock");
+            log.debug(getName()+": Attempting to acquire lease");
             
             lockAcquired = mgr.registerLease(this);
             
             // now if we have a lock then run the task
             if(lockAcquired) {
-                log.debug("Lock acquired, running task");
+                log.debug(getName()+": Lease acquired, running task");
                 this.runTask();
             } else {
-                log.debug("Lock NOT acquired, cannot continue");
+                log.debug(getName()+": Lease NOT acquired, cannot continue");
                 return;
             }
             
         } catch (Exception ex) {
-            log.error("Unexpected exception running task", ex);
+            log.error(getName()+": Unexpected exception", ex);
         } finally {
             
             if(lockAcquired) {
                 
-                log.debug("Attempting to release lock");
+                log.debug(getName()+": Attempting to release lease");
                 
                 boolean lockReleased = mgr.unregisterLease(this);
                 
                 if(lockReleased) {
-                    log.debug("Lock released, time to sleep");
+                    log.debug(getName()+": Lease released, task finished");
                 } else {
-                    log.debug("Lock NOT released, some kind of problem");
+                    log.debug(getName()+": Lease NOT released, some kind of 
problem");
                 }
             }
             

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ScheduledEntriesTask.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ScheduledEntriesTask.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ScheduledEntriesTask.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ScheduledEntriesTask.java
 Thu Jul 26 11:04:12 2007
@@ -66,6 +66,10 @@
         return getAdjustedTime(currentTime, startTimeDesc);
     }
     
+    public String getStartTimeDesc() {
+        return startTimeDesc;
+    }
+    
     public int getInterval() {
         return this.interval;
     }

Added: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TaskScheduler.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TaskScheduler.java?view=auto&rev=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TaskScheduler.java
 (added)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TaskScheduler.java
 Thu Jul 26 11:04:12 2007
@@ -0,0 +1,185 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  contributor license agreements.  The ASF licenses this file to You
+ * under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.  For additional information regarding
+ * copyright in this work, please see the NOTICE file in the top level
+ * directory of this distribution.
+ */
+
+package org.apache.roller.weblogger.business.runnable;
+
+import java.util.ArrayList;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.List;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.roller.util.DateUtil;
+import org.apache.roller.weblogger.business.WebloggerFactory;
+import org.apache.roller.weblogger.pojos.TaskLock;
+
+
+/**
+ * Manages scheduling of periodic tasks.
+ * 
+ * This scheduler is meant to be run on a single thread and once started it 
will
+ * run continuously until the thread is interrupted.  The basic logic of the
+ * scheduler is to accept some number of tasks to be run and once per minute
+ * the scheduler will launch any tasks that need to be executed.  
+ * 
+ * Tasks are executed each on their own thread, so this scheduler does not run
+ * serially like a TimerTask.  The threads used for running tasks are managed
+ * by an instance of a ThreadPoolExecutor.
+ */
+public class TaskScheduler implements Runnable {
+    
+    private static Log log = LogFactory.getLog(TaskScheduler.class);
+    
+    private static final long ONE_MINUTE_MS = (60 * 1000);
+    
+    private final ExecutorService pool;
+    private List<RollerTask> tasks = new ArrayList<RollerTask>();
+    
+    
+    public TaskScheduler() {
+        // use an expanding thread executor pool
+        pool = Executors.newCachedThreadPool();
+    }
+    
+    
+    // TODO: this should probably be a constructor arg so the list can be final
+    public void scheduleTask(RollerTask task) {
+        tasks.add(task);
+    }
+    
+    
+    public void run() {
+        
+        boolean firstRun = true;
+        
+        // run forever, or until we get interrupted
+        while(true) {
+            try {
+                // get current time (from db?)
+                Date now = new Date();
+                log.debug("Current time = "+now);
+                
+                // run tasks, skip run on first pass
+                if(firstRun) {
+                    // add a slight delay to scheduler start
+                    Calendar cal = Calendar.getInstance();
+                    cal.setTime(now);
+                    cal.add(Calendar.MINUTE, 1);
+                    cal.set(Calendar.SECOND, cal.getMinimum(Calendar.SECOND));
+                    cal.set(Calendar.MILLISECOND, 
cal.getMinimum(Calendar.MILLISECOND));
+                    now = cal.getTime();
+                    log.debug("Start time = "+now);
+                    
+                    firstRun = false;
+                } else {
+                    try {
+                        runTasks(now);
+                    } finally {
+                        // always release session after each pass
+                        WebloggerFactory.getWeblogger().release();
+                    }
+                }
+                
+                // wait 'til next minute
+                // TODO: make sure we don't get a negative value here
+                Date endOfMinute = DateUtil.getEndOfMinute(now);
+                log.debug("sleeping - "+(endOfMinute.getTime() - 
System.currentTimeMillis()));
+                Thread.sleep(endOfMinute.getTime() - 
System.currentTimeMillis());
+                
+            } catch (InterruptedException ex) {
+                // thread interrupted
+                log.debug("Thread interrupted, scheduler is stopping");
+                pool.shutdownNow();
+                break;
+            }
+        }
+        
+    }
+    
+    
+    /**
+     * Run the necessary tasks given a specific currentTime to work from.
+     */
+    private void runTasks(Date currentTime) {
+        
+        log.debug("Started - "+currentTime);
+        
+        ThreadManager tmgr = 
WebloggerFactory.getWeblogger().getThreadManager();
+        
+        for( RollerTask task : tasks ) {
+            try {
+                // get tasklock for the task
+                TaskLock tasklock = tmgr.getTaskLockByName(task.getName());
+                
+                // TODO: check if task is enabled, otherwise skip
+                if(tasklock == null) {
+                    log.debug("SKIPPING task : "+tasklock.getName());
+                    continue;
+                }
+                
+                // first, calculate the next allowed run time for the task
+                // based on when the task was last run
+                Date nextRunTime = 
tasklock.getNextAllowedRun(task.getInterval());
+                log.debug(task.getName()+": next allowed run time = 
"+nextRunTime);
+                
+                // if we missed the last scheduled run time then see when the
+                // most appropriate next run time should be and wait 'til then
+                boolean needToWait = false;
+                if(currentTime.getTime() > (nextRunTime.getTime() + 
ONE_MINUTE_MS)) {
+                    
+                    log.debug("MISSED last run, checking if waiting is 
necessary");
+                    if("startOfDay".equals(task.getStartTimeDesc())) {
+                        // for daily tasks we only run during the first 
+                        // couple minutes of the day
+                        Date startOfDay = DateUtil.getStartOfDay(currentTime);
+                        if(currentTime.getTime() > startOfDay.getTime() + (2 * 
ONE_MINUTE_MS)) {
+                            needToWait = true;
+                            log.debug("WAITING for next reasonable run time");
+                        }
+                    } else if("startOfHour".equals(task.getStartTimeDesc())) {
+                        // for hourly tasks we only run during the first
+                        // couple minutes of the hour
+                        Date startOfHour = 
DateUtil.getStartOfHour(currentTime);
+                        if(currentTime.getTime() > startOfHour.getTime() + (2 
* ONE_MINUTE_MS)) {
+                            needToWait = true;
+                            log.debug("WAITING for next reasonable run time");
+                        }
+                    } else {
+                        // for immediate tasks we just go ahead and run
+                    }
+                }
+                
+                // if we are within 1 minute of run time then execute,
+                // otherwise we do nothing
+                long differential = currentTime.getTime() - 
nextRunTime.getTime();
+                if (differential >= 0 && !needToWait) {
+                    log.debug(task.getName()+": LAUNCHING task");
+                    pool.submit(task);
+                }
+                
+            } catch(Throwable t) {
+                log.warn(task.getName()+": Unhandled exception caught", t);
+            }
+        }
+        
+        log.debug("Finished");
+    }
+    
+}

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManager.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManager.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManager.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManager.java
 Thu Jul 26 11:04:12 2007
@@ -18,8 +18,9 @@
 
 package org.apache.roller.weblogger.business.runnable;
 
-import java.util.Date;
+import org.apache.roller.weblogger.WebloggerException;
 import org.apache.roller.weblogger.business.InitializationException;
+import org.apache.roller.weblogger.pojos.TaskLock;
 
 
 /**
@@ -55,15 +56,24 @@
     
     
     /**
-     * Schedule task to run at fixed rate.
-     *
-     * @param task The RollerTask to schedule.
-     * @param startTime The Date at which to start the task.
-     * @param long The interval (in minutes) at which the task should run.
+     * Lookup a TaskLock by name.
+     * 
+     * @param name The name of the task.
+     * @return The TaskLock for the task, or null if not found.
+     * @throws WebloggerException If there is an error looking up the TaskLock.
      */
-    public void scheduleFixedRateTimerTask(RollerTask task, Date startTime, 
long intervalMins);
-    
+    public TaskLock getTaskLockByName(String name) throws WebloggerException;
+
     
+    /**
+     * Save a TaskLock.
+     * 
+     * @param tasklock The TaskLock to save.
+     * @throws WebloggerException If there is an error saving the TaskLock.
+     */
+    public void saveTaskLock(TaskLock tasklock) throws WebloggerException;
+
+
     /**
      * Try to register a lease for a given RollerTask.
      *

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManagerImpl.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManagerImpl.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManagerImpl.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/ThreadManagerImpl.java
 Thu Jul 26 11:04:12 2007
@@ -18,11 +18,9 @@
 
 package org.apache.roller.weblogger.business.runnable;
 
-import java.util.Date;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
+import java.util.concurrent.Future;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -35,25 +33,30 @@
  * Manage Roller's thread use.
  */
 @com.google.inject.Singleton
-public class ThreadManagerImpl implements ThreadManager {
+public abstract class ThreadManagerImpl implements ThreadManager {
     
     private static final Log log = LogFactory.getLog(ThreadManagerImpl.class);
     
-    // background task scheduler
-    private final ScheduledExecutorService serviceScheduler;
+    // our own scheduler thread
+    private Thread schedulerThread = null;
     
+    // a simple thread executor
+    private final ExecutorService serviceScheduler;
     
-    protected ThreadManagerImpl() {
+    
+    public ThreadManagerImpl() {
         
         log.info("Instantiating Thread Manager");
         
-        serviceScheduler = Executors.newScheduledThreadPool(10);
+        serviceScheduler = Executors.newCachedThreadPool();
     }
     
+    
     public void initialize() throws InitializationException {
         
-        Date now = new Date();
-        
+        // create scheduler
+        TaskScheduler scheduler = new TaskScheduler();
+                    
         // okay, first we look for what tasks have been enabled
         String tasksStr = WebloggerConfig.getProperty("tasks.enabled");
         String[] tasks = StringUtils.stripAll(StringUtils.split(tasksStr, 
","));
@@ -68,13 +71,8 @@
                     RollerTask task = (RollerTask) taskClass.newInstance();
                     task.init();
                     
-                    Date startTime = task.getStartTime(now);
-                    if(startTime == null || now.after(startTime)) {
-                        startTime = now;
-                    }
-                    
                     // schedule it
-                    scheduleFixedRateTimerTask(task, startTime, 
task.getInterval());
+                    scheduler.scheduleTask(task);
                     
                 } catch (ClassCastException ex) {
                     log.warn("Task does not extend RollerTask class", ex);
@@ -84,18 +82,26 @@
                     log.error("Error instantiating task", ex);
                 }
             }
-        }        
+        }
+        
+        // only start if we aren't already running
+        if (schedulerThread == null && scheduler != null) {
+            log.debug("Starting scheduler thread");
+            schedulerThread = new Thread(scheduler, "Roller Weblogger Task 
Scheduler");
+            schedulerThread.start();
+        }
     }
     
+    
     public void executeInBackground(Runnable runnable)
             throws InterruptedException {
-        ScheduledFuture scheduledTask = serviceScheduler.schedule(runnable, 0, 
TimeUnit.SECONDS);
+        Future task = serviceScheduler.submit(runnable);
     }
     
     
     public void executeInForeground(Runnable runnable)
             throws InterruptedException {
-        ScheduledFuture scheduledTask = serviceScheduler.schedule(runnable, 0, 
TimeUnit.SECONDS);
+        Future task = serviceScheduler.submit(runnable);
         
         // if this task is really meant to be executed within this calling 
thread
         // then we can add a little code here to loop until it realizes the 
task is done
@@ -103,40 +109,43 @@
     }
     
     
-    public void scheduleFixedRateTimerTask(RollerTask task, Date startTime, 
long intervalMins) {
-        
-        if (intervalMins < MIN_RATE_INTERVAL_MINS) {
-            throw new IllegalArgumentException("Interval (" + intervalMins +
-                    ") shorter than minimum allowed (" + 
MIN_RATE_INTERVAL_MINS + ")");
-        }
-        
-        ScheduledFuture scheduledTask = serviceScheduler.scheduleAtFixedRate(
-                task, 
-                startTime.getTime() - System.currentTimeMillis(), 
-                intervalMins * 60 * 1000, 
-                TimeUnit.MILLISECONDS);
-        
-        log.debug("Scheduled "+task.getClass().getName()+" at "+new 
Date(System.currentTimeMillis()+scheduledTask.getDelay(TimeUnit.MILLISECONDS)));
-    }
-    
-    
     public void shutdown() {
         
         log.debug("starting shutdown sequence");
         
         // trigger an immediate shutdown of any backgrounded tasks
         serviceScheduler.shutdownNow();
+        
+        // only stop if we are already running
+        if(schedulerThread != null) {
+            log.debug("Stopping scheduler");
+            schedulerThread.interrupt();
+        }
     }
     
     
     public void release() {
+        // no-op
     }
     
     
+    /**
+     * Default implementation of lease registration, always returns true.
+     * 
+     * Subclasses should override this method if they plan to run in an
+     * environment that supports clustered deployments.
+     */
     public boolean registerLease(RollerTask task) {
         return true;
     }
     
+    
+    /**
+     * Default implementation of lease unregistration, always returns true.
+     * 
+     * Subclasses should override this method if they plan to run in an
+     * environment that supports clustered deployments.
+     */
     public boolean unregisterLease(RollerTask task) {
         return true;
     }

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TurnoverReferersTask.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TurnoverReferersTask.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TurnoverReferersTask.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/business/runnable/TurnoverReferersTask.java
 Thu Jul 26 11:04:12 2007
@@ -60,6 +60,10 @@
         return getAdjustedTime(currentTime, startTimeDesc);
     }
     
+    public String getStartTimeDesc() {
+        return startTimeDesc;
+    }
+    
     public int getInterval() {
         return this.interval;
     }

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/RefreshRollerPlanetTask.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/RefreshRollerPlanetTask.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/RefreshRollerPlanetTask.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/RefreshRollerPlanetTask.java
 Thu Jul 26 11:04:12 2007
@@ -30,6 +30,7 @@
 import org.apache.roller.planet.business.updater.FeedUpdater;
 import org.apache.roller.planet.business.updater.SingleThreadedFeedUpdater;
 import org.apache.roller.weblogger.WebloggerException;
+import org.apache.roller.weblogger.business.WebloggerFactory;
 import org.apache.roller.weblogger.config.WebloggerConfig;
 
 
@@ -70,6 +71,10 @@
         return getAdjustedTime(currentTime, startTimeDesc);
     }
     
+    public String getStartTimeDesc() {
+        return startTimeDesc;
+    }
+    
     public int getInterval() {
         return this.interval;
     }
@@ -79,6 +84,7 @@
     }
     
     
+    @Override
     public void init() throws WebloggerException {
         
         // get relevant props
@@ -129,6 +135,7 @@
             log.error("ERROR refreshing planet", t);
         } finally {
             // always release
+            WebloggerFactory.getWeblogger().release();
             PlanetFactory.getPlanet().release();
         }
     }

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/SyncWebsitesTask.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/SyncWebsitesTask.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/SyncWebsitesTask.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/planet/tasks/SyncWebsitesTask.java
 Thu Jul 26 11:04:12 2007
@@ -18,23 +18,6 @@
 
 package org.apache.roller.weblogger.planet.tasks;
 
-import java.util.ArrayList;
-import java.util.Date;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Properties;
-import java.util.Set;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.roller.RollerException;
-import org.apache.roller.planet.business.GuicePlanetProvider;
-import org.apache.roller.weblogger.business.runnable.RollerTaskWithLeasing;
-import org.apache.roller.planet.business.PlanetFactory;
-import org.apache.roller.planet.business.PlanetManager;
-import org.apache.roller.planet.business.PlanetProvider;
-import org.apache.roller.planet.business.startup.PlanetStartup;
-import org.apache.roller.weblogger.business.WebloggerFactory;
-import org.apache.roller.weblogger.business.UserManager;
 import org.apache.roller.planet.pojos.Planet;
 import org.apache.roller.planet.pojos.PlanetGroup;
 import org.apache.roller.planet.pojos.Subscription;
@@ -76,6 +59,10 @@
     
     public Date getStartTime(Date currentTime) {
         return getAdjustedTime(currentTime, startTimeDesc);
+    }
+    
+    public String getStartTimeDesc() {
+        return startTimeDesc;
     }
     
     public int getInterval() {

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.hbm.xml
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.hbm.xml?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.hbm.xml
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.hbm.xml
 Thu Jul 26 11:04:12 2007
@@ -55,15 +55,6 @@
         />
 
         <property
-            name="locked"
-            type="boolean"
-            update="true"
-            insert="true"
-            column="islocked"
-            unique="false"
-        />
-
-        <property
             name="timeLeased"
             type="int"
             update="true"

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.java
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.java
 Thu Jul 26 11:04:12 2007
@@ -31,7 +31,6 @@
     
     private String id = UUIDGenerator.generateUUID();
     private String name = null;
-    private boolean locked = false;
     private Date timeAquired = null;
     private int timeLeased = 0;
     private Date lastRun = null;
@@ -42,19 +41,19 @@
     
     
     /**
-     * Calculate the next allowed time the task managed by this lock would
-     * be allowed to run.  i.e. lastRun + interval
+     * Calculate the next allowed time this task is allowed to run allowed to 
run.  
+     * i.e. lastRun + interval
      */
-    public Date getNextRun(int interval) {
+    public Date getNextAllowedRun(int interval) {
         
-        Date lastRun = this.getLastRun();
-        if(lastRun == null) {
-            return null;
+        Date previousRun = getLastRun();
+        if(previousRun == null) {
+            return new Date(0);
         }
         
         // calculate next run time
         Calendar cal = Calendar.getInstance();
-        cal.setTime(lastRun);
+        cal.setTime(previousRun);
         cal.add(Calendar.MINUTE, interval);
         
         return cal.getTime();
@@ -62,18 +61,21 @@
     
     
     /**
-     * Get the time the lease for this lock will expire, or null if this task
-     * lock is not currently locked.
+     * Get the time the last/current lease for this lock expires.
+     * 
+     * expireTime = timeAcquired + (timeLeased * 60sec/min) - 1 sec
+     * we remove 1 second to adjust for precision differences
      */
-    public Date getLeaseExpires() {
+    public Date getLeaseExpiration() {
         
-        if(!locked || timeAquired == null) {
-            return null;
+        Date leaseAcquisitionTime = new Date(0);
+        if(getTimeAquired() != null) {
+            leaseAcquisitionTime = getTimeAquired();
         }
         
         // calculate lease expiration time
         Calendar cal = Calendar.getInstance();
-        cal.setTime(timeAquired);
+        cal.setTime(leaseAcquisitionTime);
         cal.add(Calendar.MINUTE, timeLeased);
         
         return cal.getTime();
@@ -81,6 +83,7 @@
 
     //------------------------------------------------------- Good citizenship
 
+    @Override
     public String toString() {
         StringBuffer buf = new StringBuffer();
         buf.append("{");
@@ -92,6 +95,7 @@
         return buf.toString();
     }
 
+    @Override
     public boolean equals(Object other) {
         
         if(this == other) return true;
@@ -102,6 +106,7 @@
         return this.getName().equals(that.getName());
     }
     
+    @Override
     public int hashCode() {
         // our natrual key, or business key, is our name
         return this.getName().hashCode();
@@ -142,26 +147,7 @@
     public void setLastRun(Date lastRun) {
         this.lastRun = lastRun;
     }
-
     
-    public boolean isLocked() {
-        
-        // this method requires a little extra logic because we return false
-        // even if a task is locked when it's lease has expired
-        if(!locked) {
-            return false;
-        }
-        
-        Date now = new Date();
-        Date leaseExpiration = this.getLeaseExpires();
-        
-        return now.before(leaseExpiration);
-    }
-
-    public void setLocked(boolean locked) {
-        this.locked = locked;
-    }
-
     
     public int getTimeLeased() {
         return timeLeased;

Modified: 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.orm.xml
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.orm.xml?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.orm.xml
 (original)
+++ 
roller/trunk/apps/weblogger/src/java/org/apache/roller/weblogger/pojos/TaskLock.orm.xml
 Thu Jul 26 11:04:12 2007
@@ -19,8 +19,8 @@
             <query>SELECT t FROM TaskLock t WHERE t.name = ?1</query>
         </named-query>
         
-        <named-query 
name="TaskLock.updateClient&amp;Timeacquired&amp;TimeleasedByName&amp;Timeacquired">
-            <query> UPDATE TaskLock t SET t.clientId=?1, t.timeAquired= 
CURRENT_TIMESTAMP, t.timeLeased= ?2 WHERE t.name=?3 AND t.timeAquired=?4 AND ?5 
&lt; CURRENT_TIMESTAMP</query>
+        <named-query 
name="TaskLock.updateClient&amp;Timeacquired&amp;Timeleased&amp;LastRunByName&amp;Timeacquired">
+            <query> UPDATE TaskLock t SET t.clientId=?1, t.timeAquired= 
CURRENT_TIMESTAMP, t.timeLeased= ?2, t.lastRun= ?3 WHERE t.name=?4 AND 
t.timeAquired=?5 AND ?6 &lt; CURRENT_TIMESTAMP</query>
         </named-query>
         
         <named-query name="TaskLock.updateTimeLeasedByName&amp;Client">
@@ -42,17 +42,12 @@
                 <column name="lastrun" insertable="true" updatable="true" 
unique="false"/>
                 <temporal>TIMESTAMP</temporal>
             </basic>
-            <basic name="locked">
-                <column name="islocked" insertable="true" updatable="true" 
unique="false"/>
-            </basic>
             <basic name="timeLeased">
                 <column name="timeleased" insertable="true" updatable="true" 
unique="false"/>
             </basic>
             <basic name="clientId">
                 <column name="client" insertable="true" updatable="true" 
unique="false"/>
             </basic>
-            
-            <transient name="leaseExpires" />
             
         </attributes>
     </entity>

Modified: 
roller/trunk/apps/weblogger/test/java/org/apache/roller/weblogger/business/TaskLockTest.java
URL: 
http://svn.apache.org/viewvc/roller/trunk/apps/weblogger/test/java/org/apache/roller/weblogger/business/TaskLockTest.java?view=diff&rev=559911&r1=559910&r2=559911
==============================================================================
--- 
roller/trunk/apps/weblogger/test/java/org/apache/roller/weblogger/business/TaskLockTest.java
 (original)
+++ 
roller/trunk/apps/weblogger/test/java/org/apache/roller/weblogger/business/TaskLockTest.java
 Thu Jul 26 11:04:12 2007
@@ -86,6 +86,7 @@
         public String getName() { return "TestTask"; }
         public String getClientId() { return "TestTaskClientId"; }
         public Date getStartTime(Date current) { return current; }
+        public String getStartTimeDesc() { return "immediate"; }
         public int getLeaseTime() { return 300; }
         public int getInterval() { return 1800; }
         public void runTask() { }


Reply via email to