Author: hlship
Date: Wed Jul 13 00:02:49 2011
New Revision: 1145825
URL: http://svn.apache.org/viewvc?rev=1145825&view=rev
Log:
TAP5-1572: Flesh out the implementation and a basic test of the PeriodicExecutor
Added:
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy
Modified:
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java
tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties
Modified:
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java
URL:
http://svn.apache.org/viewvc/tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java?rev=1145825&r1=1145824&r2=1145825&view=diff
==============================================================================
---
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java
(original)
+++
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java
Wed Jul 13 00:02:49 2011
@@ -14,14 +14,292 @@
package org.apache.tapestry5.ioc.internal.services.cron;
+import org.apache.tapestry5.ioc.Invokable;
+import org.apache.tapestry5.ioc.annotations.PostInjection;
+import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
+import org.apache.tapestry5.ioc.services.ParallelExecutor;
+import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
+import org.apache.tapestry5.ioc.services.RegistryShutdownListener;
import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
import org.apache.tapestry5.ioc.services.cron.Schedule;
+import org.slf4j.Logger;
-public class PeriodicExecutorImpl implements PeriodicExecutor
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable,
RegistryShutdownListener
{
- public PeriodicJob addJob(Schedule schedule, Runnable job)
+ private final ParallelExecutor parallelExecutor;
+
+ private final Logger logger;
+
+ // Synchronized by this
+ private final List<Job> jobs = CollectionFactory.newList();
+
+ private final Thread thread = new Thread(this, "Tapestry
PeriodicExecutor");
+
+ // Synchronized by this. Set when the registry is shutdown.
+ private boolean shutdown;
+
+ private static final long FIVE_MINUTES = 5 * 60 * 1000;
+
+ private final AtomicInteger jobIdAllocator = new AtomicInteger();
+
+ private class Job implements PeriodicJob, Invokable<Void>
{
- return null;
+ final int jobId = jobIdAllocator.incrementAndGet();
+
+ private final Schedule schedule;
+
+ private final Runnable runnableJob;
+
+ private boolean executing, canceled;
+
+ private long nextExecution;
+
+ public Job(Schedule schedule, Runnable runnableJob)
+ {
+ this.schedule = schedule;
+ this.runnableJob = runnableJob;
+
+ nextExecution = schedule.firstExecution();
+ }
+
+ public synchronized long getNextExecution()
+ {
+ return nextExecution;
+ }
+
+
+ public synchronized boolean isExecuting()
+ {
+ return executing;
+ }
+
+ public synchronized boolean isCanceled()
+ {
+ return canceled;
+ }
+
+ public synchronized void cancel()
+ {
+ canceled = true;
+
+ if (!executing)
+ {
+ removeJob(this);
+ }
+
+ // Otherwise, it will be caught when the job finishes execution.
+ }
+
+ @Override
+ public synchronized String toString()
+ {
+ StringBuilder builder = new
StringBuilder("PeriodicJob[#").append(jobId);
+
+ if (executing)
+ {
+ builder.append(", executing");
+ }
+
+ if (canceled)
+ {
+ builder.append(", canceled");
+ } else
+ {
+ builder.append(String.format(", next execution
%Tk:%<TM:%<TS+%<TL", nextExecution));
+ }
+
+ return builder.append("]").toString();
+ }
+
+ /**
+ * Starts execution of the job; this sets the executing flag,
calculates the next excecution time,
+ * and uses the ParallelExecutor to run the job.
+ */
+ synchronized void start()
+ {
+ executing = true;
+
+ // This is a bit naive; it assumes there will not be a delay
waiting to execute. There's a lot of options
+ // here, such as basing the next execution on the actual start
time, or event actual completion time, or allowing
+ // overlapping executions of the Job on a more rigid schedule.
Use Quartz.
+
+ nextExecution = schedule.nextExecution(System.currentTimeMillis());
+
+ parallelExecutor.invoke(this);
+
+ if (logger.isDebugEnabled())
+ {
+ logger.debug(this + " sent for execution");
+ }
+ }
+
+ synchronized void cleanupAfterExecution()
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug(this + " execution complete");
+ }
+
+ executing = false;
+
+ if (canceled)
+ {
+ removeJob(this);
+ } else
+ {
+ // Again, naive but necessary.
+ thread.interrupt();
+ }
+ }
+
+ public Void invoke()
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug(this + " starting execution");
+ }
+
+ try
+ {
+ runnableJob.run();
+ } finally
+ {
+ cleanupAfterExecution();
+ }
+
+ return null;
+ }
}
+
+ public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger
logger)
+ {
+ this.parallelExecutor = parallelExecutor;
+ this.logger = logger;
+ }
+
+ @PostInjection
+ public void start(RegistryShutdownHub hub)
+ {
+ hub.addRegistryShutdownListener(this);
+
+ thread.start();
+ }
+
+
+ synchronized void removeJob(Job job)
+ {
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Removing " + job);
+ }
+
+ jobs.remove(job);
+ }
+
+
+ public synchronized PeriodicJob addJob(Schedule schedule, Runnable job)
+ {
+ assert schedule != null;
+ assert job != null;
+
+ Job periodicJob = new Job(schedule, job);
+
+ jobs.add(periodicJob);
+
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Added " + periodicJob);
+ }
+
+ // Wake the thread so that it can start the job, if necessary.
+
+ // Technically, this is only necessary if the new job is scheduled
earlier
+ // than any job currently in the list of jobs, but this naive
implementation
+ // is simpler.
+ thread.interrupt();
+
+ return periodicJob;
+ }
+
+ public void run()
+ {
+ while (!isShutdown())
+ {
+ long nextExecution = executeCurrentBatch();
+
+ try
+ {
+ long delay = nextExecution - System.currentTimeMillis();
+
+ if (logger.isDebugEnabled())
+ {
+ logger.debug(String.format("Sleeping for %,d ms", delay));
+ }
+
+ if (delay > 0)
+ {
+ Thread.sleep(delay);
+ }
+ } catch (InterruptedException
+ ex)
+ {
+ // Ignored; the thread is interrupted() to shut it down,
+ // or to have it execute a new batch.
+
+ if (logger.isDebugEnabled())
+ {
+ logger.debug("Interrupted");
+ }
+ }
+ }
+ }
+
+ private synchronized boolean isShutdown()
+ {
+ return shutdown;
+ }
+
+ public synchronized void registryDidShutdown()
+ {
+ shutdown = true;
+
+ thread.interrupt();
+ }
+
+ /**
+ * Finds jobs and executes jobs that are ready to be executed.
+ *
+ * @return the next execution time (from the non-executing job that is
scheduled earliest for execution).
+ */
+ private synchronized long executeCurrentBatch()
+ {
+ long now = System.currentTimeMillis();
+ long nextExecution = now + FIVE_MINUTES;
+
+ for (Job job : jobs)
+ {
+ if (job.isExecuting())
+ {
+ continue;
+ }
+
+ long jobNextExecution = job.getNextExecution();
+
+ if (jobNextExecution <= now)
+ {
+ job.start();
+ } else
+ {
+ nextExecution = Math.min(nextExecution, jobNextExecution);
+ }
+ }
+
+ return nextExecution;
+ }
+
+
}
Modified:
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java
URL:
http://svn.apache.org/viewvc/tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java?rev=1145825&r1=1145824&r2=1145825&view=diff
==============================================================================
---
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java
(original)
+++
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java
Wed Jul 13 00:02:49 2011
@@ -26,6 +26,9 @@ public interface PeriodicJob
*/
boolean isExecuting();
+ /** Has this job been canceled. */
+ boolean isCanceled();
+
/**
* Cancels the job. If currently executing, the Job will finish (this
includes awaiting execution). If not currently
* executing, the job is discarded immediately.
Added:
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy
URL:
http://svn.apache.org/viewvc/tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy?rev=1145825&view=auto
==============================================================================
---
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy
(added)
+++
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy
Wed Jul 13 00:02:49 2011
@@ -0,0 +1,52 @@
+// Copyright 2011 The Apache Software Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.tapestry.ioc.services
+
+import org.apache.tapestry5.ioc.Registry
+import org.apache.tapestry5.ioc.services.cron.IntervalSchedule
+import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor
+import org.apache.tapestry5.ioc.services.cron.PeriodicJob
+import org.apache.tapestry5.ioc.test.IOCTestCase
+import org.testng.annotations.Test
+
+/**
+ * @since 5.3
+ */
+class PeriodicExecutorTests extends IOCTestCase
+{
+
+ @Test
+ void execution_intervals()
+ {
+ Registry r = buildRegistry()
+
+ int count = 0
+
+ def schedule = new IntervalSchedule(10)
+
+ PeriodicJob job =
r.getService(PeriodicExecutor.class).addJob(schedule, { count++; })
+
+ while (count < 10)
+ {
+ sleep 10
+ }
+
+ job.cancel()
+
+ r.shutdown()
+ }
+
+
+}
Modified:
tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties
URL:
http://svn.apache.org/viewvc/tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties?rev=1145825&r1=1145824&r2=1145825&view=diff
==============================================================================
--- tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties
(original)
+++ tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties
Wed Jul 13 00:02:49 2011
@@ -15,5 +15,7 @@ log4j.category.org.apache.tapestry5.ioc.
log4j.category.org.apache.tapestry5.ioc.AdviceDemoModule.Greeter=debug
log4j.category.com.example=debug
+log4j.category.org.apache.tapestry5.ioc.services.TapestryIOCModule.PeriodicExecutor=debug
+
#
log4j.category.org.apache.tapestry5.ioc.services.TapestryIOCModule.PlasticProxyFactory=debug