Author: hlship
Date: Wed Jul 13 00:02:49 2011
New Revision: 1145825

URL: http://svn.apache.org/viewvc?rev=1145825&view=rev
Log:
TAP5-1572: Flesh out the implementation and a basic test of the PeriodicExecutor

Added:
    tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/
    tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/
    tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/
    tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/
    
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/
    
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/
    
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy
Modified:
    
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java
    
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java
    tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties

Modified: 
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java
URL: 
http://svn.apache.org/viewvc/tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java?rev=1145825&r1=1145824&r2=1145825&view=diff
==============================================================================
--- 
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java
 (original)
+++ 
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/internal/services/cron/PeriodicExecutorImpl.java
 Wed Jul 13 00:02:49 2011
@@ -14,14 +14,292 @@
 
 package org.apache.tapestry5.ioc.internal.services.cron;
 
+import org.apache.tapestry5.ioc.Invokable;
+import org.apache.tapestry5.ioc.annotations.PostInjection;
+import org.apache.tapestry5.ioc.internal.util.CollectionFactory;
+import org.apache.tapestry5.ioc.services.ParallelExecutor;
+import org.apache.tapestry5.ioc.services.RegistryShutdownHub;
+import org.apache.tapestry5.ioc.services.RegistryShutdownListener;
 import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor;
 import org.apache.tapestry5.ioc.services.cron.PeriodicJob;
 import org.apache.tapestry5.ioc.services.cron.Schedule;
+import org.slf4j.Logger;
 
-public class PeriodicExecutorImpl implements PeriodicExecutor
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+
+public class PeriodicExecutorImpl implements PeriodicExecutor, Runnable, 
RegistryShutdownListener
 {
-    public PeriodicJob addJob(Schedule schedule, Runnable job)
+    private final ParallelExecutor parallelExecutor;
+
+    private final Logger logger;
+
+    // Synchronized by this
+    private final List<Job> jobs = CollectionFactory.newList();
+
+    private final Thread thread = new Thread(this, "Tapestry 
PeriodicExecutor");
+
+    // Synchronized by this. Set when the registry is shutdown.
+    private boolean shutdown;
+
+    private static final long FIVE_MINUTES = 5 * 60 * 1000;
+
+    private final AtomicInteger jobIdAllocator = new AtomicInteger();
+
+    private class Job implements PeriodicJob, Invokable<Void>
     {
-        return null;
+        final int jobId = jobIdAllocator.incrementAndGet();
+
+        private final Schedule schedule;
+
+        private final Runnable runnableJob;
+
+        private boolean executing, canceled;
+
+        private long nextExecution;
+
+        public Job(Schedule schedule, Runnable runnableJob)
+        {
+            this.schedule = schedule;
+            this.runnableJob = runnableJob;
+
+            nextExecution = schedule.firstExecution();
+        }
+
+        public synchronized long getNextExecution()
+        {
+            return nextExecution;
+        }
+
+
+        public synchronized boolean isExecuting()
+        {
+            return executing;
+        }
+
+        public synchronized boolean isCanceled()
+        {
+            return canceled;
+        }
+
+        public synchronized void cancel()
+        {
+            canceled = true;
+
+            if (!executing)
+            {
+                removeJob(this);
+            }
+
+            // Otherwise, it will be caught when the job finishes execution.
+        }
+
+        @Override
+        public synchronized String toString()
+        {
+            StringBuilder builder = new 
StringBuilder("PeriodicJob[#").append(jobId);
+
+            if (executing)
+            {
+                builder.append(", executing");
+            }
+
+            if (canceled)
+            {
+                builder.append(", canceled");
+            } else
+            {
+                builder.append(String.format(", next execution 
%Tk:%<TM:%<TS+%<TL", nextExecution));
+            }
+
+            return builder.append("]").toString();
+        }
+
+        /**
+         * Starts execution of the job; this sets the executing flag, 
calculates the next excecution time,
+         * and uses the ParallelExecutor to run the job.
+         */
+        synchronized void start()
+        {
+            executing = true;
+
+            // This is a bit naive; it assumes there will not be a delay 
waiting to execute. There's a lot of options
+            // here, such as basing the next execution on the actual start 
time, or event actual completion time, or allowing
+            // overlapping executions of the Job on a more rigid schedule.  
Use Quartz.
+
+            nextExecution = schedule.nextExecution(System.currentTimeMillis());
+
+            parallelExecutor.invoke(this);
+
+            if (logger.isDebugEnabled())
+            {
+                logger.debug(this + " sent for execution");
+            }
+        }
+
+        synchronized void cleanupAfterExecution()
+        {
+            if (logger.isDebugEnabled())
+            {
+                logger.debug(this + " execution complete");
+            }
+
+            executing = false;
+
+            if (canceled)
+            {
+                removeJob(this);
+            } else
+            {
+                // Again, naive but necessary.
+                thread.interrupt();
+            }
+        }
+
+        public Void invoke()
+        {
+            if (logger.isDebugEnabled())
+            {
+                logger.debug(this + " starting execution");
+            }
+
+            try
+            {
+                runnableJob.run();
+            } finally
+            {
+                cleanupAfterExecution();
+            }
+
+            return null;
+        }
     }
+
+    public PeriodicExecutorImpl(ParallelExecutor parallelExecutor, Logger 
logger)
+    {
+        this.parallelExecutor = parallelExecutor;
+        this.logger = logger;
+    }
+
+    @PostInjection
+    public void start(RegistryShutdownHub hub)
+    {
+        hub.addRegistryShutdownListener(this);
+
+        thread.start();
+    }
+
+
+    synchronized void removeJob(Job job)
+    {
+        if (logger.isDebugEnabled())
+        {
+            logger.debug("Removing " + job);
+        }
+
+        jobs.remove(job);
+    }
+
+
+    public synchronized PeriodicJob addJob(Schedule schedule, Runnable job)
+    {
+        assert schedule != null;
+        assert job != null;
+
+        Job periodicJob = new Job(schedule, job);
+
+        jobs.add(periodicJob);
+
+        if (logger.isDebugEnabled())
+        {
+            logger.debug("Added " + periodicJob);
+        }
+
+        // Wake the thread so that it can start the job, if necessary.
+
+        // Technically, this is only necessary if the new job is scheduled 
earlier
+        // than any job currently in the list of jobs, but this naive 
implementation
+        // is simpler.
+        thread.interrupt();
+
+        return periodicJob;
+    }
+
+    public void run()
+    {
+        while (!isShutdown())
+        {
+            long nextExecution = executeCurrentBatch();
+
+            try
+            {
+                long delay = nextExecution - System.currentTimeMillis();
+
+                if (logger.isDebugEnabled())
+                {
+                    logger.debug(String.format("Sleeping for %,d ms", delay));
+                }
+
+                if (delay > 0)
+                {
+                    Thread.sleep(delay);
+                }
+            } catch (InterruptedException
+                    ex)
+            {
+                // Ignored; the thread is interrupted() to shut it down,
+                // or to have it execute a new batch.
+
+                if (logger.isDebugEnabled())
+                {
+                    logger.debug("Interrupted");
+                }
+            }
+        }
+    }
+
+    private synchronized boolean isShutdown()
+    {
+        return shutdown;
+    }
+
+    public synchronized void registryDidShutdown()
+    {
+        shutdown = true;
+
+        thread.interrupt();
+    }
+
+    /**
+     * Finds jobs and executes jobs that are ready to be executed.
+     *
+     * @return the next execution time (from the non-executing job that is 
scheduled earliest for execution).
+     */
+    private synchronized long executeCurrentBatch()
+    {
+        long now = System.currentTimeMillis();
+        long nextExecution = now + FIVE_MINUTES;
+
+        for (Job job : jobs)
+        {
+            if (job.isExecuting())
+            {
+                continue;
+            }
+
+            long jobNextExecution = job.getNextExecution();
+
+            if (jobNextExecution <= now)
+            {
+                job.start();
+            } else
+            {
+                nextExecution = Math.min(nextExecution, jobNextExecution);
+            }
+        }
+
+        return nextExecution;
+    }
+
+
 }

Modified: 
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java
URL: 
http://svn.apache.org/viewvc/tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java?rev=1145825&r1=1145824&r2=1145825&view=diff
==============================================================================
--- 
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java
 (original)
+++ 
tapestry/tapestry5/trunk/tapestry-ioc/src/main/java/org/apache/tapestry5/ioc/services/cron/PeriodicJob.java
 Wed Jul 13 00:02:49 2011
@@ -26,6 +26,9 @@ public interface PeriodicJob
      */
     boolean isExecuting();
 
+    /** Has this job been canceled. */
+    boolean isCanceled();
+
     /**
      * Cancels the job. If currently executing, the Job will finish (this 
includes awaiting execution). If not currently
      * executing, the job is discarded immediately.

Added: 
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy
URL: 
http://svn.apache.org/viewvc/tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy?rev=1145825&view=auto
==============================================================================
--- 
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy
 (added)
+++ 
tapestry/tapestry5/trunk/tapestry-ioc/src/test/groovy/org/apache/tapestry/ioc/services/PeriodicExecutorTests.groovy
 Wed Jul 13 00:02:49 2011
@@ -0,0 +1,52 @@
+// Copyright 2011 The Apache Software Foundation
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package org.apache.tapestry.ioc.services
+
+import org.apache.tapestry5.ioc.Registry
+import org.apache.tapestry5.ioc.services.cron.IntervalSchedule
+import org.apache.tapestry5.ioc.services.cron.PeriodicExecutor
+import org.apache.tapestry5.ioc.services.cron.PeriodicJob
+import org.apache.tapestry5.ioc.test.IOCTestCase
+import org.testng.annotations.Test
+
+/**
+ * @since 5.3
+ */
+class PeriodicExecutorTests extends IOCTestCase
+{
+
+    @Test
+    void execution_intervals()
+    {
+        Registry r = buildRegistry()
+
+        int count = 0
+
+        def schedule = new IntervalSchedule(10)
+
+        PeriodicJob job = 
r.getService(PeriodicExecutor.class).addJob(schedule, { count++; })
+
+        while (count < 10)
+        {
+            sleep 10
+        }
+
+        job.cancel()
+
+        r.shutdown()
+    }
+
+
+}

Modified: 
tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties
URL: 
http://svn.apache.org/viewvc/tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties?rev=1145825&r1=1145824&r2=1145825&view=diff
==============================================================================
--- tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties 
(original)
+++ tapestry/tapestry5/trunk/tapestry-ioc/src/test/resources/log4j.properties 
Wed Jul 13 00:02:49 2011
@@ -15,5 +15,7 @@ log4j.category.org.apache.tapestry5.ioc.
 log4j.category.org.apache.tapestry5.ioc.AdviceDemoModule.Greeter=debug
 
 log4j.category.com.example=debug
+log4j.category.org.apache.tapestry5.ioc.services.TapestryIOCModule.PeriodicExecutor=debug
+
 
 # 
log4j.category.org.apache.tapestry5.ioc.services.TapestryIOCModule.PlasticProxyFactory=debug


Reply via email to