Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java?rev=781093&view=auto ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java (added) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/CronScheduler.java Tue Jun 2 17:42:46 2009 @@ -0,0 +1,296 @@ +package org.apache.ode.bpel.engine.cron; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Timer; +import java.util.TimerTask; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; + +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.engine.Contexts; +import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware; +import org.apache.ode.bpel.iapi.ClusterAware; +import org.apache.ode.bpel.iapi.ProcessConf; +import org.apache.ode.bpel.iapi.ProcessConf.CronJob; +import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable; +import org.apache.ode.utils.CronExpression; + +public class CronScheduler { + static final Log __log = LogFactory.getLog(CronScheduler.class); + + // minimum interval of the cron job(1 second) + private final long MIN_INTERVAL = 0; + // if the work is schedule too late from the due time, skip it + private final long TOLERABLE_SCHEDULE_DELAY = 0; + + private ExecutorService _scheduledTaskExec; + + private Contexts _contexts; + + private final Timer _schedulerTimer = new Timer("CronScheduler", true); + + private final Collection<TerminationListener> _systemTerminationListeners = new ArrayList<TerminationListener>(); + + private final Map<QName, Collection<TerminationListener>> _terminationListenersByPid = new HashMap<QName, Collection<CronScheduler.TerminationListener>>(); + + private volatile boolean _shuttingDown = false; + + public void setScheduledTaskExec(ExecutorService taskExec) { + _scheduledTaskExec = taskExec; + } + + public void setContexts(Contexts _contexts) { + this._contexts = _contexts; + } + + public void shutdown() { + _shuttingDown = true; + _schedulerTimer.cancel(); + } + + public void cancelProcessCronJobs(QName pid, boolean undeployed) { + assert pid != null; + + if( __log.isInfoEnabled() ) __log.info("Cancelling PROCESS CRON jobs for: " + pid); + Collection<TerminationListener> listenersToTerminate = new ArrayList<TerminationListener>(); + + synchronized( _terminationListenersByPid ) { + Collection<TerminationListener> listeners = _terminationListenersByPid.get(pid); + if( listeners != null ) { + listenersToTerminate.addAll(listeners); + } + if( undeployed ) { + _terminationListenersByPid.remove(pid); + } + } + + // terminate existing cron jobs if there are + synchronized( pid ) { + for( TerminationListener listener : listenersToTerminate ) { + listener.terminate(); + } + } + } + + public void scheduleProcessCronJobs(QName pid, ProcessConf pconf) { + if( _shuttingDown ) { + return; + } + assert pid != null; + + cancelProcessCronJobs(pid, false); + Collection<TerminationListener> newListeners = new ArrayList<TerminationListener>(); + + synchronized( pid ) { + if( __log.isInfoEnabled() ) __log.info("Scheduling PROCESS CRON jobs for: " + pid); + + // start new cron jobs + for( final CronJob job : pconf.getCronJobs() ) { + if( __log.isDebugEnabled() ) __log.debug("Scheduling PROCESS CRON job: " + job.getCronExpression() + " for: " + pid); + // for each different scheduled time + Runnable runnable = new Runnable() { + public void run() { + if( __log.isDebugEnabled() ) __log.debug("Running cron cleanup with details list size: " + job.getRunnableDetailList().size()); + for( Map<String, Object> details : job.getRunnableDetailList() ) { + try { + // for each clean up for the scheduled time + RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable(); + cleanup.restoreFromDetailsMap(details); + cleanup.setContexts(_contexts); + cleanup.run(); + if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a PROCESS CRON job: " + cleanup); + } catch(Exception re) { + __log.error("Error during runtime data cleanup from a PROCESS CRON: " + details + "; check your cron settings in deploy.xml.", re); + // don't sweat.. the rest of the system and other cron jobs still should work + } + } + } + }; + newListeners.add(schedule(job.getCronExpression(), runnable, null, null)); + } + } + + // make sure the pid does not get into the terminationListener map if no cron is setup + if( !newListeners.isEmpty() ) { + synchronized( _terminationListenersByPid ) { + Collection<TerminationListener> oldListeners = _terminationListenersByPid.get(pid); + if( oldListeners == null ) { + _terminationListenersByPid.put(pid, newListeners); + } else { + oldListeners.addAll(newListeners); + } + } + } + } + + public void refreshSystemCronJobs(SystemSchedulesConfig systemSchedulesConf) { + if( _shuttingDown ) { + return; + } + + synchronized( _systemTerminationListeners) { + if( __log.isInfoEnabled() ) __log.info("Refreshing SYSTEM CRON jobs."); + + try { + // if error thrown on reading the schedules.xml, do not cancel existing cron jobs + List<CronJob> systemCronJobs = systemSchedulesConf.getSystemCronJobs(); + + // cancel cron jobs + for( TerminationListener listener : _systemTerminationListeners ) { + listener.terminate(); + } + _systemTerminationListeners.clear(); + + // start new cron jobs + for( final CronJob job : systemCronJobs ) { + if( __log.isDebugEnabled() ) __log.debug("Scheduling SYSTEM CRON job:" + job); + // for each different scheduled time + Runnable runnable = new Runnable() { + public void run() { + for( Map<String, Object> details : job.getRunnableDetailList() ) { + try { + // for now, we have only runtime data cleanup cron job defined + // for each clean up for the scheduled time + RuntimeDataCleanupRunnable cleanup = new RuntimeDataCleanupRunnable(); + synchronized( _terminationListenersByPid ) { + if( !_terminationListenersByPid.isEmpty() ) { + details.put("pidsToExclude", _terminationListenersByPid.keySet()); + } + } + cleanup.restoreFromDetailsMap(details); + cleanup.setContexts(_contexts); + cleanup.run(); + if( __log.isDebugEnabled() ) __log.debug("Finished running runtime data cleanup from a SYSTEM CRON job:" + cleanup); + } catch( Exception e ) { + __log.error("Error running a runtime data cleanup from a SYSTEM CRON job: " + details + "; check your system cron setup.", e); + } + } + } + }; + _systemTerminationListeners.add(schedule(job.getCronExpression(), runnable, null, null)); + } + } catch( Exception e ) { + __log.error("Error during refreshing SYSTEM CRON schedules: ", e); + } + } + } + + public TerminationListener schedule(final CronExpression cronExpression, + final Runnable runnable, final Map<String, Object> runnableDetails, + TerminationListener terminationListener) { + if( _shuttingDown ) { + __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored."); + return new TerminationListener() { + public void terminate() { + // do nothing + } + }; + } + + assert cronExpression != null; + assert runnable != null; + + final Date nextScheduleTime = cronExpression.getNextValidTimeAfter(new Date( + System.currentTimeMillis() + MIN_INTERVAL)); + final CronScheduledJob job = new CronScheduledJob(nextScheduleTime, runnable, runnableDetails, cronExpression, terminationListener); + if( __log.isDebugEnabled() ) __log.debug("CRON will run in " + (nextScheduleTime.getTime() - System.currentTimeMillis()) + "ms."); + + try { + _schedulerTimer.schedule(new TimerTask() { + @Override + public void run() { + __log.debug("Cron scheduling timer kicked in: " + cronExpression); + // run only if the current node is the coordinator, + // with the SimpleScheduler, the node is always the coordinator + if( !(_contexts.scheduler instanceof ClusterAware) + || ((ClusterAware)_contexts.scheduler).amICoordinator() ) { + // do not hold the timer thread too long, submit the work to an executorService + _scheduledTaskExec.submit(job); + __log.debug("CRON job scheduled " + runnable); + } + } + }, nextScheduleTime); + } catch( IllegalStateException ise ) { + if( _shuttingDown ) { + __log.info("CRON Scheduler is being shut down. This new scheduling request is ignored."); + } else { + throw ise; + } + } + + return job.terminationListener; + } + + public interface TerminationListener { + void terminate(); + } + + private class CronScheduledJob implements Callable<TerminationListener> { + private volatile boolean terminated = false; + private Date nextScheduleTime; + private Runnable runnable; + private Map<String, Object> runnableDetails; + private CronExpression cronExpression; + private TerminationListener terminationListener; + + public CronScheduledJob(Date nextScheduleTime, + Runnable runnable, Map<String, Object> runnableDetails, + CronExpression cronExpression, TerminationListener terminationListener) { + this.nextScheduleTime = nextScheduleTime; + this.runnable = runnable; + this.runnableDetails = runnableDetails; + this.cronExpression = cronExpression; + if( terminationListener == null ) { + terminationListener = new TerminationListener() { + public void terminate() { + terminated = true; + } + }; + } + this.terminationListener = terminationListener; + } + + public TerminationListener call() throws Exception { + try { + if( TOLERABLE_SCHEDULE_DELAY == 0 || + nextScheduleTime.getTime() < System.currentTimeMillis() + TOLERABLE_SCHEDULE_DELAY) { + if( runnableDetails != null && + runnable instanceof MapSerializableRunnable ) { + ((MapSerializableRunnable)runnable).restoreFromDetailsMap(runnableDetails); + } + if (runnable instanceof ContextsAware) { + ((ContextsAware) runnable).setContexts(_contexts); + } + if( !_shuttingDown && !terminated ) { + __log.debug("Running CRON job: " + runnable + " for " + nextScheduleTime.getTime()); + runnable.run(); + } + } else { + // ignore the scheduling.. it will be scheduled later + } + } catch( Exception e ) { + if( _shuttingDown ) { + __log.info("A cron job threw an Exception during ODE shutdown: " + e.getMessage() + ", you can ignore the error."); + } else if( e instanceof RuntimeException ) { + throw e; + } else { + throw new RuntimeException("Exception during running cron scheduled job: " + runnable, e); + } + } finally { + if( !_shuttingDown && !terminated ) { + schedule(cronExpression, runnable, runnableDetails, terminationListener); + } + } + + return terminationListener; + } + } +} \ No newline at end of file
Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java?rev=781093&view=auto ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java (added) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/RuntimeDataCleanupRunnable.java Tue Jun 2 17:42:46 2009 @@ -0,0 +1,99 @@ +package org.apache.ode.bpel.engine.cron; + +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import javax.xml.namespace.QName; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.common.InstanceFilter; +import org.apache.ode.bpel.dao.BpelDAOConnection; +import org.apache.ode.bpel.dao.FilteredInstanceDeletable; +import org.apache.ode.bpel.engine.Contexts; +import org.apache.ode.bpel.engine.BpelServerImpl.ContextsAware; +import org.apache.ode.bpel.iapi.ProcessConf.CLEANUP_CATEGORY; +import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo; +import org.apache.ode.bpel.iapi.Scheduler.MapSerializableRunnable; + +public class RuntimeDataCleanupRunnable implements MapSerializableRunnable, ContextsAware { + private final Log _log = LogFactory.getLog(RuntimeDataCleanupRunnable.class); + + private static final long serialVersionUID = 1L; + + private transient Contexts _contexts; + + private int _transactionSize; + private CleanupInfo _cleanupInfo; + private QName _pid; + private Set<QName> _pidsToExclude; + + public RuntimeDataCleanupRunnable() { + } + + @SuppressWarnings("unchecked") + public void restoreFromDetailsMap(Map<String, Object> details) { + _cleanupInfo = (CleanupInfo)details.get("cleanupInfo"); + _transactionSize = (Integer)details.get("transactionSize"); + _pid = (QName)details.get("pid"); + _pidsToExclude = (Set<QName>)details.get("pidsToExclude"); + } + + public void storeToDetailsMap(Map<String, Object> details) { + // we don't serialize + } + + public void setContexts(Contexts contexts) { + _contexts = contexts; + } + + public void run() { + _log.info("CleanInstances.run()."); + for( String filter : _cleanupInfo.getFilters() ) { + _log.info("CleanInstances.run(" + filter + ")"); + long numberOfDeletedInstances = 0; + do { + numberOfDeletedInstances = cleanInstances(filter, _cleanupInfo.getCategories(), _transactionSize); + } while( numberOfDeletedInstances == _transactionSize ); + } + } + + int cleanInstances(String filter, final Set<CLEANUP_CATEGORY> categories, int limit) { + if( _pid != null ) { + filter += " pid=" + _pid; + } else if( _pidsToExclude != null ) { + StringBuffer pids = new StringBuffer(); + for( QName pid : _pidsToExclude ) { + if( pids.length() > 0 ) { + pids.append("|"); + } + pids.append(pid); + } + filter += " pid<>" + pids.toString(); + } + + _log.debug("CleanInstances using filter: " + filter + ", limit: " + limit); + + final InstanceFilter instanceFilter = new InstanceFilter(filter, "", limit); + try { + if( _contexts.scheduler != null ) { + return _contexts.execTransaction(new Callable<Integer>() { + public Integer call() throws Exception { + BpelDAOConnection con = _contexts.dao.getConnection(); + if( con instanceof FilteredInstanceDeletable ) { + return ((FilteredInstanceDeletable)con).deleteInstances(instanceFilter, categories); + } + return 0; + } + }); + } else { + return 0; + } + } catch (RuntimeException re) { + throw re; + } catch (Exception e) { + throw new RuntimeException("Exception while listing instances: ", e); + } + } +} \ No newline at end of file Added: ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java?rev=781093&view=auto ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java (added) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/engine/cron/SystemSchedulesConfig.java Tue Jun 2 17:42:46 2009 @@ -0,0 +1,103 @@ +package org.apache.ode.bpel.engine.cron; + +import java.io.File; +import java.text.ParseException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.ode.bpel.schedules.SchedulesDocument; +import org.apache.ode.bpel.schedules.TSchedule; +import org.apache.ode.bpel.dd.TCleanup; +import org.apache.ode.bpel.iapi.ContextException; +import org.apache.ode.bpel.iapi.ProcessConf.CleanupInfo; +import org.apache.ode.bpel.iapi.ProcessConf.CronJob; +import org.apache.ode.store.ProcessCleanupConfImpl; +import org.apache.ode.utils.CronExpression; +import org.apache.xmlbeans.XmlOptions; + +public class SystemSchedulesConfig { + private final static Log __log = LogFactory.getLog(SystemSchedulesConfig.class); + + public final static String SCHEDULE_CONFIG_FILE_PROP_KEY = "org.apache.ode.scheduleConfigFile"; + private File schedulesFile; + + public SystemSchedulesConfig(File configRoot) { + String scheduleConfigFile = System.getProperty(SCHEDULE_CONFIG_FILE_PROP_KEY); + if( scheduleConfigFile != null ) { + schedulesFile = new File(scheduleConfigFile); + if( !new File(scheduleConfigFile).exists()) { + __log.warn("A custom location for schedules has been set. However, the file does not exist at the location: " + + schedulesFile.getAbsolutePath() + ". The file will be read when one gets created."); + } + } else { + assert configRoot != null; + schedulesFile = new File(configRoot, "schedules.xml"); + } + __log.info("SYSTEM CRON configuration: " + schedulesFile.getAbsolutePath()); + } + + public File getSchedulesFile() { + return schedulesFile; + } + + /** + * Returns the list of cron jobs configured for all processes. This call returns + * a fresh snapshot. + * + * @return the list of cron jobs + */ + public List<CronJob> getSystemCronJobs() { + List<CronJob> jobs = new ArrayList<CronJob>(); + + if( schedulesFile != null && schedulesFile.exists() ) { + for(TSchedule schedule : getSystemSchedulesDocument().getSchedules().getScheduleList()) { + CronJob job = new CronJob(); + try { + job.setCronExpression(new CronExpression(schedule.getWhen())); + for(final TCleanup aCleanup : schedule.getCleanupList()) { + CleanupInfo cleanupInfo = new CleanupInfo(); + assert !aCleanup.getFilterList().isEmpty(); + cleanupInfo.setFilters(aCleanup.getFilterList()); + ProcessCleanupConfImpl.processACleanup(cleanupInfo.getCategories(), aCleanup.getCategoryList()); + + Map<String, Object> runnableDetails = new HashMap<String, Object>(); + runnableDetails.put("cleanupInfo", cleanupInfo); + runnableDetails.put("transactionSize", 10); + job.getRunnableDetailList().add(runnableDetails); + __log.info("SYSTEM CRON configuration added a runtime data cleanup: " + runnableDetails); + } + jobs.add(job); + } catch( ParseException pe ) { + __log.error("Exception during parsing the schedule cron expression: " + schedule.getWhen() + ", skipped the scheduled job.", pe); + } + } + } + + __log.info("SYSTEM CRON configuration found cron jobs: " + jobs); + return jobs; + } + + @SuppressWarnings("unchecked") + private SchedulesDocument getSystemSchedulesDocument() { + SchedulesDocument sd = null; + + try { + XmlOptions options = new XmlOptions(); + HashMap otherNs = new HashMap(); + + otherNs.put("http://ode.fivesight.com/schemas/2006/06/27/dd", + "http://www.apache.org/ode/schemas/schedules/2009/05"); + options.setLoadSubstituteNamespaces(otherNs); + sd = SchedulesDocument.Factory.parse(schedulesFile, options); + } catch (Exception e) { + throw new ContextException("Couldn't read schedule descriptor at location " + + schedulesFile.getAbsolutePath(), e); + } + + return sd; + } +} \ No newline at end of file Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java?rev=781093&r1=781092&r2=781093&view=diff ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java (original) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/BpelDAOConnectionImpl.java Tue Jun 2 17:42:46 2009 @@ -18,6 +18,7 @@ */ package org.apache.ode.bpel.memdao; +import java.io.Serializable; import java.util.*; import java.util.concurrent.atomic.AtomicLong; @@ -68,6 +69,12 @@ return _store.get(processId); } + public ProcessDAO createTransientProcess(Serializable id) { + ProcessDaoImpl process = new ProcessDaoImpl(this, _store, null, null, (String)id, 0); + + return process; + } + public ProcessDAO createProcess(QName pid, QName type, String guid, long version) { ProcessDaoImpl process = new ProcessDaoImpl(this, _store, pid, type, guid, version); _store.put(pid, process); @@ -83,6 +90,7 @@ return null; } + @SuppressWarnings("unchecked") public Collection<ProcessInstanceDAO> instanceQuery(InstanceFilter filter) { if (filter.getLimit() == 0) { return Collections.EMPTY_LIST; Modified: ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java URL: http://svn.apache.org/viewvc/ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java?rev=781093&r1=781092&r2=781093&view=diff ============================================================================== --- ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java (original) +++ ode/trunk/engine/src/main/java/org/apache/ode/bpel/memdao/ProcessDaoImpl.java Tue Jun 2 17:42:46 2009 @@ -172,7 +172,7 @@ } } - public void delete() { + public void deleteProcessAndRoutes() { _store.remove(_processId); } Modified: ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java URL: http://svn.apache.org/viewvc/ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java?rev=781093&r1=781092&r2=781093&view=diff ============================================================================== --- ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java (original) +++ ode/trunk/il-common/src/main/java/org/apache/ode/il/MockScheduler.java Tue Jun 2 17:42:46 2009 @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.Callable; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -88,6 +89,21 @@ return new GUID().toString(); } + public String scheduleMapSerializableRunnable(final MapSerializableRunnable runnable, final Date when) throws ContextException { + registerSynchronizer(new Synchronization() { + public void afterCompletion(int status) { + _timer.schedule(new TimerTask() { + public void run() { + runnable.run(); + } + }, when != null ? when : new Date()); + } + public void beforeCompletion() { } + }); + + return null; + } + public void cancelJob(String arg0) throws ContextException { } Modified: ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java URL: http://svn.apache.org/viewvc/ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java?rev=781093&r1=781092&r2=781093&view=diff ============================================================================== --- ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java (original) +++ ode/trunk/scheduler-simple/src/main/java/org/apache/ode/scheduler/simple/SimpleScheduler.java Tue Jun 2 17:42:46 2009 @@ -175,11 +175,27 @@ if (__log.isDebugEnabled()) __log.debug("scheduling " + jobDetail + " for " + when); - boolean immediate = when.getTime() <= ctime + _immediateInterval; - boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval; + return schedulePersistedJob(new Job(when.getTime(), true, jobDetail), when, ctime); + } + + public String scheduleMapSerializableRunnable(MapSerializableRunnable runnable, Date when) throws ContextException { + long ctime = System.currentTimeMillis(); + if (when == null) + when = new Date(ctime); + + Map<String, Object> jobDetails = new HashMap<String, Object>(); + jobDetails.put("runnable", runnable); + runnable.storeToDetailsMap(jobDetails); + + if (__log.isDebugEnabled()) + __log.debug("scheduling " + jobDetails + " for " + when); - Job job = new Job(when.getTime(), true, jobDetail); + return schedulePersistedJob(new Job(when.getTime(), true, jobDetails), when, ctime); + } + public String schedulePersistedJob(Job job, Date when, long ctime) throws ContextException { + boolean immediate = when.getTime() <= ctime + _immediateInterval; + boolean nearfuture = !immediate && when.getTime() <= ctime + _nearFutureInterval; try { if (immediate) { // If we have too many jobs in the queue, we don't allow any new ones
