ZEST-128 - Fixed up Scheduler library so that schedules are not lost. Tried to introduce better separation of concerns. Added counters for Execution and Exceptions.
Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/02491a34 Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/02491a34 Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/02491a34 Branch: refs/heads/develop Commit: 02491a34e8ee1a118e358f5ed4f667b4d70eae82 Parents: 80284fb Author: Niclas Hedhman <[email protected]> Authored: Sat Nov 14 11:09:38 2015 +0800 Committer: Niclas Hedhman <[email protected]> Committed: Sat Nov 14 11:09:38 2015 +0800 ---------------------------------------------------------------------- .../DefaultEntityStoreUnitOfWork.java | 1 - .../ContextResourceClientFactoryTest.java | 4 +- .../restlet/identity/IdentityManager.java | 4 +- .../zest/library/scheduler/Execution.java | 240 ++++++++++++++ .../zest/library/scheduler/Scheduler.java | 53 +++- .../scheduler/SchedulerConfiguration.java | 4 +- .../zest/library/scheduler/SchedulerMixin.java | 318 ++++--------------- .../library/scheduler/SchedulerService.java | 53 +--- .../library/scheduler/SchedulesHandler.java | 89 ++++++ .../org/apache/zest/library/scheduler/Task.java | 23 +- .../zest/library/scheduler/TaskRunner.java | 69 ++++ .../scheduler/bootstrap/SchedulerAssembler.java | 14 +- .../defaults/DefaultRejectionHandler.java | 39 +++ .../defaults/DefaultScheduleFactoryMixin.java | 91 ++++++ .../defaults/DefaultThreadFactory.java | 56 ++++ .../library/scheduler/schedule/Schedule.java | 55 +++- .../scheduler/schedule/ScheduleFactory.java | 115 +------ .../scheduler/schedule/ScheduleTime.java | 21 +- .../scheduler/schedule/cron/CronSchedule.java | 22 +- .../scheduler/schedule/once/OnceSchedule.java | 22 +- .../library/scheduler/timeline/Timeline.java | 4 +- .../timeline/TimelineScheduleMixin.java | 5 +- .../timeline/TimelineSchedulerServiceMixin.java | 30 +- .../scheduler/AbstractSchedulerTest.java | 22 +- .../apache/zest/library/scheduler/FooTask.java | 19 +- .../zest/library/scheduler/SchedulerTest.java | 87 +++-- .../scheduler/docsupport/SchedulerDocs.java | 2 +- 27 files changed, 938 insertions(+), 524 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java ---------------------------------------------------------------------- diff --git a/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java b/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java index bb873d1..f6e48f1 100755 --- a/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java +++ b/core/spi/src/main/java/org/apache/zest/spi/entitystore/DefaultEntityStoreUnitOfWork.java @@ -82,7 +82,6 @@ public final class DefaultEntityStoreUnitOfWork public EntityState entityStateOf( ModuleSpi module, EntityReference anIdentity ) throws EntityNotFoundException { - EntityState entityState = states.get( anIdentity ); if( entityState != null ) { http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java ---------------------------------------------------------------------- diff --git a/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java b/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java index 0efbe9d..47ff667 100644 --- a/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java +++ b/libraries/rest-client/src/test/java/org/apache/zest/library/rest/client/ContextResourceClientFactoryTest.java @@ -21,6 +21,7 @@ package org.apache.zest.library.rest.client; import java.io.File; import java.io.IOException; import java.util.Collections; +import org.apache.zest.api.usecase.UsecaseBuilder; import org.hamcrest.CoreMatchers; import org.junit.After; import org.junit.Assert; @@ -597,7 +598,8 @@ public class ContextResourceClientFactoryTest public void beforeCompletion() throws UnitOfWorkCompletionException { - throw new ConcurrentEntityModificationException( Collections.<EntityComposite>emptyList() ); + throw new ConcurrentEntityModificationException( Collections.<EntityComposite>emptyList(), + UsecaseBuilder.newUsecase( "Testing" ) ); } public void afterCompletion( UnitOfWorkStatus status ) http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java ---------------------------------------------------------------------- diff --git a/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java b/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java index 5286413..0f659f9 100644 --- a/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java +++ b/libraries/restlet/src/main/java/org/apache/zest/library/restlet/identity/IdentityManager.java @@ -40,7 +40,7 @@ import static org.apache.zest.functional.Iterables.first; @Concerns( { UnitOfWorkConcern.class } ) public interface IdentityManager { - char SEPARATOR = '~'; + String SEPARATOR = "~"; String IDENTITY_SIGNATURE = "[0-9][0-9]*~.*"; boolean isIdentity( String candidate ); @@ -123,7 +123,7 @@ public interface IdentityManager @Override public Class extractType( String identity ) { - if( isIdentity( identity ) ) + if( !isIdentity( identity ) ) { throw new IllegalArgumentException( "Given argument '" + identity + "' is not an Identity" ); } http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java new file mode 100644 index 0000000..dbb3b72 --- /dev/null +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.zest.library.scheduler; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.RunnableFuture; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import org.apache.zest.api.concern.Concerns; +import org.apache.zest.api.configuration.Configuration; +import org.apache.zest.api.injection.scope.Structure; +import org.apache.zest.api.injection.scope.This; +import org.apache.zest.api.mixin.Mixins; +import org.apache.zest.api.structure.Module; +import org.apache.zest.api.unitofwork.NoSuchEntityException; +import org.apache.zest.api.unitofwork.UnitOfWork; +import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkRetry; +import org.apache.zest.library.scheduler.schedule.Schedule; +import org.apache.zest.library.scheduler.schedule.ScheduleTime; + +@Mixins( Execution.ExecutionMixin.class ) +@Concerns( UnitOfWorkConcern.class ) +public interface Execution +{ + + void dispatchForExecution( Schedule schedule ); + + void start() + throws Exception; + + void stop() + throws Exception; + + @UnitOfWorkPropagation + @UnitOfWorkRetry( retries = 3 ) + void updateNextTime( ScheduleTime schedule ); + + class ExecutionMixin + implements Execution, Runnable + { + private static final ThreadGroup TG = new ThreadGroup( "Zest Scheduling" ); + + @Structure + private Module module; + + @This + private Scheduler scheduler; + + @This + private Configuration<SchedulerConfiguration> config; + + @This + private ThreadFactory threadFactory; + + @This + private RejectedExecutionHandler rejectionHandler; + + private final SortedSet<ScheduleTime> timingQueue = new TreeSet<>(); + private volatile boolean running; + private ThreadPoolExecutor taskExecutor; + private Thread scheduleThread; + + @Override + @UnitOfWorkPropagation + public void run() + { + synchronized( this ) + { + running = true; + while( running ) + { + try + { + if( timingQueue.size() > 0 ) + { + ScheduleTime scheduleTime = timingQueue.first(); + waitFor( scheduleTime ); + timingQueue.remove( scheduleTime ); + updateNextTime( scheduleTime ); + submitTaskForExecution( scheduleTime ); + } + else + { + this.wait( 100 ); + } + } + catch( InterruptedException e ) + { + // Ignore. Used to signal "Hey, wake up. Time to work..." + System.out.println("Interrupted"); + } + } + } + } + + private void waitFor( ScheduleTime scheduleTime ) + throws InterruptedException + { + long now = System.currentTimeMillis(); + long waitingTime = scheduleTime.nextTime() - now; + if( waitingTime > 0 ) + { + this.wait( waitingTime ); + } + } + + @Override + public void updateNextTime( ScheduleTime scheduleTime ) + { + long now = System.currentTimeMillis(); + + try (UnitOfWork uow = module.newUnitOfWork()) + { + try + { + Schedule schedule = uow.get( Schedule.class, scheduleTime.scheduleIdentity() ); + long nextTime = schedule.nextRun( now ); + if( nextTime != Long.MIN_VALUE ) + { + scheduleTime = new ScheduleTime( schedule.identity().get(), nextTime ); + timingQueue.add( scheduleTime ); + } + } + catch( NoSuchEntityException e ) + { + // Schedule has been removed. + scheduler.cancelSchedule( scheduleTime.scheduleIdentity() ); + } + uow.complete(); + } + catch( UnitOfWorkCompletionException e ) + { + throw new UndeclaredThrowableException( e ); + } + } + + private void submitTaskForExecution( ScheduleTime scheduleTime ) + { + Runnable taskRunner = module.newTransient( Runnable.class, scheduleTime ); + this.taskExecutor.submit( taskRunner ); + } + + public void dispatchForExecution( Schedule schedule ) + { + long now = System.currentTimeMillis(); + synchronized( this ) + { + long nextRun = schedule.nextRun( now ); + if( nextRun > 0 ) + { + timingQueue.add( new ScheduleTime( schedule.identity().get(), nextRun ) ); + scheduleThread.interrupt(); + } + } + } + + @Override + public void start() + throws Exception + { + SchedulerConfiguration configuration = config.get(); + Integer workersCount = configuration.workersCount().get(); + Integer workQueueSize = configuration.workQueueSize().get(); + createThreadPoolExecutor( workersCount, workQueueSize ); + taskExecutor.prestartAllCoreThreads(); + + scheduleThread = new Thread( TG, this, "Scheduler" ); + scheduleThread.start(); + } + + private void createThreadPoolExecutor( Integer workersCount, Integer workQueueSize ) + { + int corePoolSize = 2; + if( workersCount > 4 ) + { + corePoolSize = workersCount / 4 + 1; + } + if( corePoolSize > 50 ) + { + corePoolSize = 20; + } + if( workersCount > 200 ) + { + workersCount = 200; + } + taskExecutor = new ThreadPoolExecutor( corePoolSize, workersCount, + 0, TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>( workQueueSize ), + threadFactory, rejectionHandler ); + } + + @Override + public void stop() + throws Exception + { + + running = false; + synchronized( this ) + { + scheduleThread.interrupt(); + } + taskExecutor.shutdown(); + try + { + taskExecutor.awaitTermination( 5, TimeUnit.SECONDS ); + } + catch( InterruptedException e ) + { + e.printStackTrace(); + } + taskExecutor.shutdownNow(); + } + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java index 2752461..f8aae19 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Scheduler.java @@ -45,10 +45,12 @@ import static org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation.Propa * By default, a {@link Schedule} is not durable. In other words, it do not survive an {@link Application} restart. * </p> * <p> - * To make a {@link Schedule} durable, set it's durable property to true once its scheduled. + * All {@link Schedule}s are durable and stored in the visible {@link org.apache.zest.spi.entitystore.EntityStore} like + * any ordinary {@link org.apache.zest.api.entity.EntityComposite}. There is also a {@link org.apache.zest.library.scheduler.schedule.Schedules} + * entity composite that has Associations to all active, completed and cancelled schedules. * </p> * <p> - * Durable {@link Schedule}s that have no future run are removed by {@code SchedulerGarbageCollector} (not implemented?). + * * </p> */ @Concerns( UnitOfWorkConcern.class ) @@ -59,36 +61,33 @@ public interface Scheduler * * @param task Task to be scheduled once * @param initialSecondsDelay Initial delay the Task will be run after, in seconds - * @param durable true if this Schedule should survive a restart. * * @return The newly created Schedule */ @UnitOfWorkPropagation( MANDATORY ) - Schedule scheduleOnce( Task task, int initialSecondsDelay, boolean durable ); + Schedule scheduleOnce( Task task, int initialSecondsDelay ); /** * Schedule a Task to be run after a given initial delay in seconds. * - * @param task Task to be scheduled once - * @param runAt The future point in time when the Schedule will be run. - * @param durable true if this Schedule should survive a restart. + * @param task Task to be scheduled once + * @param runAt The future point in time when the Schedule will be run. * * @return The newly created Schedule */ @UnitOfWorkPropagation( MANDATORY ) - Schedule scheduleOnce( Task task, DateTime runAt, boolean durable ); + Schedule scheduleOnce( Task task, DateTime runAt ); /** * Schedule a Task using a CronExpression. * * @param task Task to be scheduled once * @param cronExpression CronExpression for creating the Schedule for the given Task - * @param durable true if this Schedule should survive a restart. * * @return The newly created Schedule */ @UnitOfWorkPropagation( MANDATORY ) - Schedule scheduleCron( Task task, @CronExpression String cronExpression, boolean durable ); + Schedule scheduleCron( Task task, @CronExpression String cronExpression ); /** * Schedule a Task using a CronExpression with a given initial delay in milliseconds. @@ -96,12 +95,11 @@ public interface Scheduler * @param task Task to be scheduled once * @param cronExpression CronExpression for creating the Schedule for the given Task * @param initialDelay Initial delay the Schedule will be active after, in milliseconds - * @param durable true if this Schedule should survive a restart. * * @return The newly created Schedule */ @UnitOfWorkPropagation( MANDATORY ) - Schedule scheduleCron( Task task, @CronExpression String cronExpression, long initialDelay, boolean durable ); + Schedule scheduleCron( Task task, @CronExpression String cronExpression, long initialDelay ); /** * Schedule a Task using a CronExpression starting at a given date. @@ -109,10 +107,35 @@ public interface Scheduler * @param task Task to be scheduled once * @param cronExpression CronExpression for creating the Schedule for the given Task * @param start Date from which the Schedule will become active - * @param durable true if this Schedule should survive a restart. * * @return The newly created Schedule */ @UnitOfWorkPropagation( MANDATORY ) - Schedule scheduleCron( Task task, @CronExpression String cronExpression, DateTime start, boolean durable ); -} + Schedule scheduleCron( Task task, @CronExpression String cronExpression, DateTime start ); + + /** Schedules a custom Schedule. + * + * + * @param schedule The Schedule instance to be scheduled. + */ + @UnitOfWorkPropagation( MANDATORY ) + void scheduleCron( Schedule schedule ); + + /** Cancels a Schedule. + * Reads the Schedule from the EntityStore and calls {@link #cancelSchedule(Schedule)}. + * + * @param scheduleId The identity of the Schedule to be cancelled. + */ + @UnitOfWorkPropagation( MANDATORY ) + void cancelSchedule( String scheduleId ); + + /** Cancels the provided Schedule. + * + * Cancellation can be done before, while and after execution of the Schedule. If the execution + * is in progress, it will not be interrupted. + * + * @param schedule The schedule to be cancelled. + */ + @UnitOfWorkPropagation( MANDATORY ) + public void cancelSchedule( Schedule schedule ); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java index e338c31..0ebc81d 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerConfiguration.java @@ -33,13 +33,13 @@ public interface SchedulerConfiguration /** * @return Number of worker threads, optional and defaults to the number of available cores. */ - @Optional + @Optional @UseDefaults Property<Integer> workersCount(); /** * @return Size of the queue to use for holding tasks before they are run, optional and defaults to 10. */ - @Optional + @Optional @UseDefaults Property<Integer> workQueueSize(); /** http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java index 69329dc..52c2f56 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java @@ -18,16 +18,6 @@ */ package org.apache.zest.library.scheduler; -import java.util.SortedSet; -import java.util.TreeSet; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledFuture; -import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.ThreadFactory; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import org.apache.zest.api.configuration.Configuration; import org.apache.zest.api.injection.scope.Service; import org.apache.zest.api.injection.scope.Structure; @@ -37,11 +27,9 @@ import org.apache.zest.api.structure.Module; import org.apache.zest.api.unitofwork.NoSuchEntityException; import org.apache.zest.api.unitofwork.UnitOfWork; import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException; -import org.apache.zest.api.usecase.Usecase; import org.apache.zest.api.usecase.UsecaseBuilder; import org.apache.zest.library.scheduler.schedule.Schedule; import org.apache.zest.library.scheduler.schedule.ScheduleFactory; -import org.apache.zest.library.scheduler.schedule.ScheduleTime; import org.apache.zest.library.scheduler.schedule.Schedules; import org.apache.zest.library.scheduler.schedule.cron.CronExpression; import org.joda.time.DateTime; @@ -52,17 +40,10 @@ public class SchedulerMixin implements Scheduler, ServiceActivation { private static final Logger LOGGER = LoggerFactory.getLogger( Scheduler.class ); - private static final int DEFAULT_WORKERS_COUNT = Runtime.getRuntime().availableProcessors() + 1; - private static final int DEFAULT_WORKQUEUE_SIZE = 10; @Service private ScheduleFactory scheduleFactory; - private final SortedSet<ScheduleTime> timingQueue = new TreeSet<>(); - - private ScheduledExecutorService managementExecutor; - private ThreadPoolExecutor taskExecutor; - @Structure private Module module; @@ -70,304 +51,137 @@ public class SchedulerMixin private SchedulerService me; @This - private ThreadFactory threadFactory; + private SchedulesHandler schedulesHandler; @This - private RejectedExecutionHandler rejectionHandler; + private Execution execution; @This private Configuration<SchedulerConfiguration> config; - private ScheduleHandler scheduleHandler; + public SchedulerMixin() + { + } @Override - public Schedule scheduleOnce( Task task, int initialSecondsDelay, boolean durable ) + public Schedule scheduleOnce( Task task, int initialSecondsDelay ) { long now = System.currentTimeMillis(); - Schedule schedule = scheduleFactory.newOnceSchedule( task, new DateTime( now + initialSecondsDelay * 1000 ), durable ); - if( durable ) - { - Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) ); - schedules.schedules().add( schedule ); - } - dispatchForExecution( schedule ); + Schedule schedule = scheduleFactory.newOnceSchedule( task, new DateTime( now + initialSecondsDelay * 1000 ) ); + saveAndDispatch( schedule ); return schedule; } @Override - public Schedule scheduleOnce( Task task, DateTime runAt, boolean durable ) + public Schedule scheduleOnce( Task task, DateTime runAt ) { - Schedule schedule = scheduleFactory.newOnceSchedule( task, runAt, durable ); - dispatchForExecution( schedule ); - if( durable ) - { - Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) ); - schedules.schedules().add( schedule ); - } + Schedule schedule = scheduleFactory.newOnceSchedule( task, runAt ); + saveAndDispatch( schedule ); return schedule; } @Override - public Schedule scheduleCron( Task task, String cronExpression, boolean durable ) + public Schedule scheduleCron( Task task, String cronExpression ) { DateTime now = new DateTime(); - Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, now, durable ); - if( durable ) - { - Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) ); - schedules.schedules().add( schedule ); - } - dispatchForExecution( schedule ); + Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, now ); + saveAndDispatch( schedule ); return schedule; } @Override - public Schedule scheduleCron( Task task, @CronExpression String cronExpression, DateTime start, boolean durable ) + public Schedule scheduleCron( Task task, @CronExpression String cronExpression, DateTime start ) { - Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, start, durable ); - if( durable ) - { - Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) ); - schedules.schedules().add( schedule ); - } - dispatchForExecution( schedule ); + Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, start ); + saveAndDispatch( schedule ); return schedule; } @Override - public Schedule scheduleCron( Task task, String cronExpression, long initialDelay, boolean durable ) - { - DateTime start = new DateTime( System.currentTimeMillis() + initialDelay ); - Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, start, durable ); - if( durable ) - { - Schedules schedules = module.currentUnitOfWork().get( Schedules.class, getSchedulesIdentity( me ) ); - schedules.schedules().add( schedule ); - } - dispatchForExecution( schedule ); - return schedule; - } - - private void dispatchForExecution( Schedule schedule ) - { - long now = System.currentTimeMillis(); - synchronized( timingQueue ) - { - if( timingQueue.size() == 0 ) - { - long nextRun = schedule.nextRun( now ); - if( nextRun < 0 ) - { - return; - } - timingQueue.add( new ScheduleTime( schedule.identity().get(), nextRun ) ); - if( scheduleHandler == null ) - { - dispatchHandler(); - } - } - else - { - ScheduleTime first = timingQueue.first(); - long nextRun = schedule.nextRun( now ); - if( nextRun < 0 ) - { - return; - } - timingQueue.add( new ScheduleTime( schedule.identity().get(), nextRun ) ); - ScheduleTime newFirst = timingQueue.first(); - if( !first.equals( newFirst ) ) - { - // We need to restart the managementThread, which is currently waiting for a 'later' event to - // occur than the one that was just scheduled. - if( scheduleHandler != null && scheduleHandler.future != null ) - { - scheduleHandler.future.cancel( true ); - } - dispatchHandler(); - } - } - } - } - - private void dispatchHandler() + public void scheduleCron( Schedule schedule ) { - scheduleHandler = new ScheduleHandler(); - managementExecutor.schedule( scheduleHandler, timingQueue.first().nextTime - System.currentTimeMillis(), TimeUnit.MILLISECONDS ); + saveAndDispatch( schedule ); } @Override - public void activateService() - throws Exception + public Schedule scheduleCron( Task task, String cronExpression, long initialDelay ) { - // Handle configuration defaults - SchedulerConfiguration configuration = config.get(); - Integer workersCount = configuration.workersCount().get(); - Integer workQueueSize = configuration.workQueueSize().get(); - - if( workersCount == null ) - { - workersCount = DEFAULT_WORKERS_COUNT; - LOGGER.debug( "Workers count absent from configuration, falled back to default: {} workers", DEFAULT_WORKERS_COUNT ); - } - if( workQueueSize == null ) - { - workQueueSize = DEFAULT_WORKQUEUE_SIZE; - LOGGER.debug( "WorkQueue size absent from configuration, falled back to default: {}", DEFAULT_WORKQUEUE_SIZE ); - } - - int corePoolSize = 2; - if( workersCount > 4 ) - { - corePoolSize = workersCount / 4; - } - // Throws IllegalArgument if corePoolSize or keepAliveTime less than zero, - // or if workersCount less than or equal to zero, - // or if corePoolSize greater than workersCount. - taskExecutor = new ThreadPoolExecutor( corePoolSize, workersCount, - 0, TimeUnit.MILLISECONDS, - new LinkedBlockingQueue<Runnable>( workQueueSize ), - threadFactory, rejectionHandler ); - taskExecutor.prestartAllCoreThreads(); - managementExecutor = new ScheduledThreadPoolExecutor( 2, threadFactory, rejectionHandler ); - loadSchedules(); - LOGGER.debug( "Activated" ); + DateTime start = new DateTime( System.currentTimeMillis() + initialDelay ); + Schedule schedule = scheduleFactory.newCronSchedule( task, cronExpression, start ); + saveAndDispatch( schedule ); + return schedule; } - private void loadSchedules() - throws UnitOfWorkCompletionException + @Override + public void cancelSchedule( String scheduleId ) { - UnitOfWork uow = module.newUnitOfWork(); + UnitOfWork uow = module.currentUnitOfWork(); + Schedule schedule = null; try { - Schedules schedules = uow.get( Schedules.class, getSchedulesIdentity( me ) ); - for( Schedule schedule : schedules.schedules() ) - { - dispatchForExecution( schedule ); - } + schedule = uow.get( Schedule.class, scheduleId ); } catch( NoSuchEntityException e ) { - // Create a new Schedules entity for keeping track of them all. - uow.newEntity( Schedules.class, getSchedulesIdentity( me ) ); - uow.complete(); - } - finally - { - if( uow.isOpen() ) - { - uow.discard(); - } + return; } + cancelSchedule( schedule ); } - public static String getSchedulesIdentity( SchedulerService service ) + @Override + public void cancelSchedule( Schedule schedule ) { - return "Schedules:" + service.identity().get(); + Schedules active = schedulesHandler.getActiveSchedules(); + if( active.schedules().remove( schedule ) ) + { + schedule.cancelled().set( true ); + } } - @Override - public void passivateService() - throws Exception + private void saveAndDispatch( Schedule schedule ) { - managementExecutor.shutdown(); - taskExecutor.shutdown(); - - managementExecutor.awaitTermination( 5, TimeUnit.SECONDS ); - managementExecutor.shutdownNow(); - - taskExecutor.awaitTermination( 5, TimeUnit.SECONDS ); - taskExecutor.shutdownNow(); - - LOGGER.debug( "Passivated" ); + Schedules schedules = schedulesHandler.getActiveSchedules(); + schedules.schedules().add( schedule ); + execution.dispatchForExecution( schedule ); } - /** - * This little bugger wakes up when it is time to dispatch a Task, creates the Runner and dispatches itself - * for the next run. - */ - class ScheduleHandler - implements Runnable + private void loadSchedules() + throws UnitOfWorkCompletionException { - private ScheduledFuture<?> future; - - @Override - public void run() + try (UnitOfWork ignored = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Initialize Schedules" ) )) { - synchronized( timingQueue ) + Schedules schedules = schedulesHandler.getActiveSchedules(); + for( Schedule schedule : schedules.schedules() ) { - ScheduleTime scheduleTime = timingQueue.first(); - timingQueue.remove( scheduleTime ); - ScheduleRunner scheduleRunner = new ScheduleRunner( scheduleTime, SchedulerMixin.this, module ); - taskExecutor.submit( scheduleRunner ); - if( timingQueue.size() == 0 ) + if( schedule.cancelled().get() || schedule.done().get() ) { - scheduleHandler = null; + schedules.schedules().remove( schedule ); } else { - ScheduleTime nextTime = timingQueue.first(); - future = managementExecutor.schedule( scheduleHandler, nextTime.nextTime, TimeUnit.MILLISECONDS ); + execution.dispatchForExecution( schedule ); } } } } - /** - * Handle {@link Task}'s {@link org.apache.zest.api.unitofwork.UnitOfWork} and {@link org.apache.zest.library.scheduler.timeline.TimelineRecord}s creation. - */ - public static class ScheduleRunner - implements Runnable + @Override + public void activateService() + throws Exception { - private final Module module; - private final ScheduleTime schedule; - private final SchedulerMixin schedulerMixin; - - public ScheduleRunner( ScheduleTime schedule, SchedulerMixin schedulerMixin, Module module ) - { - this.schedule = schedule; - this.schedulerMixin = schedulerMixin; - this.module = module; - } + // Throws IllegalArgument if corePoolSize or keepAliveTime less than zero, + // or if workersCount less than or equal to zero, + // or if corePoolSize greater than workersCount. + loadSchedules(); + execution.start(); + LOGGER.debug( "Activated" ); + } - // WARN Watch this code, see if we can do better, maybe leverage @UnitOfWorkRetry - @Override - public void run() - { - Usecase usecase = UsecaseBuilder.newUsecase( "ScheduleRunner" ); - UnitOfWork uow = module.newUnitOfWork( usecase ); - Schedule schedule = null; - try - { - schedule = uow.get( Schedule.class, this.schedule.scheduleIdentity ); - Task task = schedule.task().get(); - try - { - schedule.taskStarting(); - task.run(); - schedule.taskCompletedSuccessfully(); - } - catch( RuntimeException ex ) - { - schedule.taskCompletedWithException( ex ); - } - uow.complete(); - } - catch( UnitOfWorkCompletionException ex ) - { - } - finally - { - if( schedule != null ) - { - schedulerMixin.dispatchForExecution( schedule ); - } - // What should we do if we can't manage the Running flag?? - if( uow.isOpen() ) - { - uow.discard(); - } - } - } + @Override + public void passivateService() + throws Exception + { + execution.stop(); + LOGGER.debug( "Passivated" ); } } http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java index 1200594..0d1ef4c 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerService.java @@ -18,61 +18,14 @@ */ package org.apache.zest.library.scheduler; -import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.ThreadPoolExecutor; -import java.util.concurrent.atomic.AtomicInteger; import org.apache.zest.api.entity.Identity; -import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.mixin.Mixins; import org.apache.zest.api.service.ServiceActivation; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.zest.library.scheduler.defaults.DefaultRejectionHandler; +import org.apache.zest.library.scheduler.defaults.DefaultThreadFactory; -@Mixins( { SchedulerMixin.class, SchedulerService.ThreadFactory.class, SchedulerService.RejectionHandler.class } ) +@Mixins( { SchedulerMixin.class, DefaultThreadFactory.class, DefaultRejectionHandler.class } ) public interface SchedulerService extends Scheduler, ServiceActivation, Identity { - class RejectionHandler - implements RejectedExecutionHandler - { - private static final Logger LOGGER = LoggerFactory.getLogger( SchedulerService.class ); - - @Override - public void rejectedExecution( Runnable r, ThreadPoolExecutor executor ) - { - LOGGER.error( "Runnable [" + r + "] was rejected by executor [" + executor + "]" ); - } - } - - class ThreadFactory - implements java.util.concurrent.ThreadFactory - { - private static final AtomicInteger POOL_NUMBER = new AtomicInteger( 1 ); - private final ThreadGroup group; - private final AtomicInteger threadNumber = new AtomicInteger( 1 ); - private final String namePrefix; - - protected ThreadFactory( @This SchedulerService me ) - { - SecurityManager sm = System.getSecurityManager(); - group = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); - namePrefix = me.identity().get() + "-P" + POOL_NUMBER.getAndIncrement() + "W"; - } - - @Override - public Thread newThread( Runnable runnable ) - { - Thread thread = new Thread( group, runnable, namePrefix + threadNumber.getAndIncrement(), 0 ); - if( thread.isDaemon() ) - { - thread.setDaemon( false ); - } - if( thread.getPriority() != Thread.NORM_PRIORITY ) - { - thread.setPriority( Thread.NORM_PRIORITY ); - } - return thread; - } - } - } http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulesHandler.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulesHandler.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulesHandler.java new file mode 100644 index 0000000..b76ef96 --- /dev/null +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulesHandler.java @@ -0,0 +1,89 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.zest.library.scheduler; + +import org.apache.zest.api.entity.Identity; +import org.apache.zest.api.injection.scope.Structure; +import org.apache.zest.api.injection.scope.This; +import org.apache.zest.api.mixin.Mixins; +import org.apache.zest.api.structure.Module; +import org.apache.zest.api.unitofwork.NoSuchEntityException; +import org.apache.zest.api.unitofwork.UnitOfWork; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation; +import org.apache.zest.library.scheduler.schedule.Schedules; + +@Mixins(SchedulesHandler.SchedulesHandlerMixin.class) +public interface SchedulesHandler +{ + @UnitOfWorkPropagation( UnitOfWorkPropagation.Propagation.MANDATORY) + Schedules getActiveSchedules(); + + @UnitOfWorkPropagation( UnitOfWorkPropagation.Propagation.MANDATORY) + Schedules getCancelledSchedules(); + + class SchedulesHandlerMixin implements SchedulesHandler + { + @This + private Identity me; + + @Structure + private Module module; + + @Override + public Schedules getActiveSchedules() + { + return getOrCreateSchedules(getActiveSchedulesIdentity()); + } + + @Override + public Schedules getCancelledSchedules() + { + return getOrCreateSchedules(getCancelledSchedulesIdentity()); + } + + public String getActiveSchedulesIdentity() + { + return "Schedules-Active:" + me.identity().get(); + } + + public String getCancelledSchedulesIdentity() + { + return "Schedules-Cancelled:" + me.identity().get(); + } + + private Schedules getOrCreateSchedules( String identity ){ + UnitOfWork uow = module.currentUnitOfWork(); + Schedules schedules; + try + { + schedules = uow.get( Schedules.class, identity ); + } + catch( NoSuchEntityException e ) + { + // Create a new Schedules entity for keeping track of them all. + schedules = uow.newEntity( Schedules.class, identity ); + } + return schedules; + + } + + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java index 6832ce5..6cc284c 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Task.java @@ -20,17 +20,28 @@ package org.apache.zest.library.scheduler; import java.util.List; import org.apache.zest.api.common.UseDefaults; +import org.apache.zest.api.concern.Concerns; import org.apache.zest.api.property.Property; import org.apache.zest.api.unitofwork.UnitOfWork; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation; /** * Compose an Entity using this type to be able to Schedule it. + *<p> + * A Task is associated from a {@link org.apache.zest.library.scheduler.schedule.Schedule}, and upon time to execute + * the SchedulerService will dispatch a TaskRunner in a new thread, and establish a UnitOfWork (Usecase name of "Task Runner"). + *</p> + *<p> + * The {@code Task} type declares the {@link UnitOfWorkConcern} and therefor the {@code Task} implementation may + * declare the {@link UnitOfWorkPropagation} annotation with the + * {@link org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation.Propagation#REQUIRES_NEW} and a different + * {@link UnitOfWork} strategy, such as {@code Retries} and {@code DiscardOn}. * - * A Task is wrapped in a {@link org.apache.zest.library.scheduler.SchedulerMixin.ScheduleRunner} before being run by an executor. - * {@link org.apache.zest.library.scheduler.SchedulerMixin.ScheduleRunner} wrap a {@link UnitOfWork} around the {@link Task#run()} invocation. + *</p> * * Here is a simple example: - * <pre> + * <pre><code> * interface MyTask * extends Task * { @@ -42,15 +53,18 @@ import org.apache.zest.api.unitofwork.UnitOfWork; * implements Runnable * { * @This MyTaskEntity me; + * * public void run() * { * me.customState().set( me.anotherEntity().get().doSomeStuff( me.customState().get() ) ); * } * } - * </pre> + * </code></pre> + * * Finaly, {@literal MyTask} must be assembled into an {@literal EntityComposite}. */ // START SNIPPET: task +@Concerns( UnitOfWorkConcern.class ) public interface Task extends Runnable { @@ -58,5 +72,6 @@ public interface Task @UseDefaults Property<List<String>> tags(); + } // END SNIPPET: task http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java new file mode 100644 index 0000000..8beacee --- /dev/null +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.zest.library.scheduler; + +import java.lang.reflect.UndeclaredThrowableException; +import org.apache.zest.api.injection.scope.Structure; +import org.apache.zest.api.injection.scope.Uses; +import org.apache.zest.api.structure.Module; +import org.apache.zest.api.unitofwork.UnitOfWork; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation; +import org.apache.zest.library.scheduler.schedule.Schedule; +import org.apache.zest.library.scheduler.schedule.ScheduleTime; + +public class TaskRunner + implements Runnable +{ + @Structure + private Module module; + + @Uses + private ScheduleTime schedule; + + @Override + @UnitOfWorkPropagation( usecase = "Task Runner" ) + public void run() + { + try + { + UnitOfWork uow = module.currentUnitOfWork(); + Schedule schedule = uow.get( Schedule.class, this.schedule.scheduleIdentity() ); + Task task = schedule.task().get(); + try + { + schedule.taskStarting(); + task.run(); + schedule.taskCompletedSuccessfully(); + } + catch( RuntimeException ex ) + { + schedule.taskCompletedWithException( ex ); + schedule.exceptionCounter().set( schedule.exceptionCounter().get() + 1 ); + } + schedule.executionCounter().set( schedule.executionCounter().get() + 1 ); + uow.complete(); + } + catch( Exception e ) + { + throw new UndeclaredThrowableException( e ); + } + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java index 92a7aa9..af5f475 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/bootstrap/SchedulerAssembler.java @@ -18,6 +18,7 @@ */ package org.apache.zest.library.scheduler.bootstrap; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern; import org.apache.zest.bootstrap.Assemblers; import org.apache.zest.bootstrap.AssemblyException; import org.apache.zest.bootstrap.EntityDeclaration; @@ -26,6 +27,7 @@ import org.apache.zest.bootstrap.ServiceDeclaration; import org.apache.zest.bootstrap.ValueDeclaration; import org.apache.zest.library.scheduler.SchedulerConfiguration; import org.apache.zest.library.scheduler.SchedulerService; +import org.apache.zest.library.scheduler.TaskRunner; import org.apache.zest.library.scheduler.schedule.ScheduleFactory; import org.apache.zest.library.scheduler.schedule.Schedules; import org.apache.zest.library.scheduler.schedule.cron.CronSchedule; @@ -55,6 +57,10 @@ import org.apache.zest.library.scheduler.timeline.TimelineSchedulerServiceMixin; public class SchedulerAssembler extends Assemblers.VisibilityConfig<SchedulerAssembler> { + + private static final int DEFAULT_WORKERS_COUNT = Runtime.getRuntime().availableProcessors() + 1; + private static final int DEFAULT_WORKQUEUE_SIZE = 10; + private boolean timeline; /** @@ -82,6 +88,8 @@ public class SchedulerAssembler .visibleIn( visibility() ) .instantiateOnStartup(); + assembly.transients( Runnable.class ).withMixins( TaskRunner.class ).withConcerns( UnitOfWorkConcern.class ); + if( timeline ) { scheduleEntities.withTypes( Timeline.class ) @@ -99,7 +107,11 @@ public class SchedulerAssembler if( hasConfig() ) { - configModule().entities( SchedulerConfiguration.class ).visibleIn( configVisibility() ); + configModule().entities( SchedulerConfiguration.class ) + .visibleIn( configVisibility() ); + SchedulerConfiguration defaults = assembly.forMixin( SchedulerConfiguration.class ).declareDefaults(); + defaults.workersCount().set( DEFAULT_WORKERS_COUNT ); + defaults.workQueueSize().set( DEFAULT_WORKQUEUE_SIZE ); } } } http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultRejectionHandler.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultRejectionHandler.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultRejectionHandler.java new file mode 100644 index 0000000..9a8e631 --- /dev/null +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultRejectionHandler.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.zest.library.scheduler.defaults; + +import java.util.concurrent.RejectedExecutionHandler; +import java.util.concurrent.ThreadPoolExecutor; +import org.apache.zest.library.scheduler.SchedulerService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultRejectionHandler + implements RejectedExecutionHandler +{ + private static final Logger LOGGER = LoggerFactory.getLogger( SchedulerService.class ); + + @Override + public void rejectedExecution( Runnable r, ThreadPoolExecutor executor ) + { + LOGGER.error( "Runnable [" + r + "] was rejected by executor [" + executor + "]" ); + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultScheduleFactoryMixin.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultScheduleFactoryMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultScheduleFactoryMixin.java new file mode 100644 index 0000000..f05c041 --- /dev/null +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultScheduleFactoryMixin.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.zest.library.scheduler.defaults; + +import org.apache.zest.api.entity.EntityBuilder; +import org.apache.zest.api.injection.scope.Service; +import org.apache.zest.api.injection.scope.Structure; +import org.apache.zest.api.structure.Module; +import org.apache.zest.api.unitofwork.UnitOfWork; +import org.apache.zest.library.scheduler.SchedulerService; +import org.apache.zest.library.scheduler.Task; +import org.apache.zest.library.scheduler.schedule.Schedule; +import org.apache.zest.library.scheduler.schedule.ScheduleFactory; +import org.apache.zest.library.scheduler.schedule.cron.CronSchedule; +import org.apache.zest.library.scheduler.schedule.once.OnceSchedule; +import org.apache.zest.spi.uuid.UuidIdentityGeneratorService; +import org.joda.time.DateTime; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class DefaultScheduleFactoryMixin + implements ScheduleFactory +{ + private static final Logger logger = LoggerFactory.getLogger( ScheduleFactory.class ); + + @Structure + private Module module; + + @Service + private SchedulerService scheduler; + + @Service + private UuidIdentityGeneratorService uuid; + + @Override + public CronSchedule newCronSchedule( Task task, String cronExpression, DateTime start ) + { + return newPersistentCronSchedule( task, cronExpression, start ); + } + + @Override + public Schedule newOnceSchedule( Task task, DateTime runAt ) + { + return newPersistentOnceSchedule( task, runAt ); + } + + private CronSchedule newPersistentCronSchedule( Task task, String cronExpression, DateTime start ) + { + UnitOfWork uow = module.currentUnitOfWork(); + EntityBuilder<CronSchedule> builder = uow.newEntityBuilder( CronSchedule.class ); + CronSchedule instance = builder.instance(); + instance.task().set( task ); + instance.start().set( start ); + instance.identity().set( uuid.generate( CronSchedule.class ) ); + instance.cronExpression().set( cronExpression ); + CronSchedule schedule = builder.newInstance(); + logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() ); + return schedule; + } + + private Schedule newPersistentOnceSchedule( Task task, DateTime runAt ) + { + UnitOfWork uow = module.currentUnitOfWork(); + EntityBuilder<OnceSchedule> builder = uow.newEntityBuilder( OnceSchedule.class ); + OnceSchedule builderInstance = builder.instance(); + builderInstance.task().set( task ); + builderInstance.start().set( runAt ); + builderInstance.identity().set( uuid.generate( OnceSchedule.class ) ); + OnceSchedule schedule = builder.newInstance(); + logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() ); + return schedule; + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultThreadFactory.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultThreadFactory.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultThreadFactory.java new file mode 100644 index 0000000..c834f50 --- /dev/null +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/defaults/DefaultThreadFactory.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.zest.library.scheduler.defaults; + +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.zest.api.injection.scope.This; +import org.apache.zest.library.scheduler.SchedulerService; + +public class DefaultThreadFactory + implements java.util.concurrent.ThreadFactory +{ + private static final AtomicInteger POOL_NUMBER = new AtomicInteger( 1 ); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger( 1 ); + private final String namePrefix; + + protected DefaultThreadFactory( @This SchedulerService me ) + { + SecurityManager sm = System.getSecurityManager(); + group = ( sm != null ) ? sm.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = me.identity().get() + "-P" + POOL_NUMBER.getAndIncrement() + "W"; + } + + @Override + public Thread newThread( Runnable runnable ) + { + Thread thread = new Thread( group, runnable, namePrefix + threadNumber.getAndIncrement(), 0 ); + if( thread.isDaemon() ) + { + thread.setDaemon( false ); + } + if( thread.getPriority() != Thread.NORM_PRIORITY ) + { + thread.setPriority( Thread.NORM_PRIORITY ); + } + return thread; + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java index 9427be3..d2da51b 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/Schedule.java @@ -18,6 +18,8 @@ */ package org.apache.zest.library.scheduler.schedule; +import org.apache.zest.api.common.UseDefaults; +import org.apache.zest.api.entity.EntityComposite; import org.joda.time.DateTime; import org.apache.zest.api.association.Association; import org.apache.zest.api.entity.Identity; @@ -28,8 +30,7 @@ import org.apache.zest.library.scheduler.Task; /** * Represent the scheduling of a {@link Task}. */ -public interface Schedule - extends Identity +public interface Schedule extends EntityComposite { /** * @return The Association to the Task to be executed when it is time. @@ -43,6 +44,50 @@ public interface Schedule @Immutable Property<DateTime> start(); + /** Returns true if the Schedule has been cancelled. + * + * @return true if the Schedule has been cancelled. + */ + @UseDefaults + Property<Boolean> cancelled(); + + /** Returns true if the Schedule is currently running. + * + * @return true if the Schedule is currently running. + */ + @UseDefaults + Property<Boolean> running(); + + /** Returns the number of times the {@link Task} has been executed. + * <p> + * Each time the {@link Task#run} method completes, with or without an {@link Exception}, this + * counter is incremented by 1. + * </p> + * + * @return true the number of Exception that has occurred when running the {@link Task}. + */ + @UseDefaults + Property<Long> executionCounter(); + + /** Returns the number of Exception that has occurred when running the {@link Task}. + * <p> + * Each time the {@link Task#run} method throws a {@link RuntimeException}, this property + * is incremenented by 1, + * </p> + * + * @return true the number of Exception that has occurred when running the {@link Task}. + */ + @UseDefaults + Property<Long> exceptionCounter(); + + /** Returns true if the Schedule is done and will not be executed any more times. + * + * @return true if the Schedule is done and will not be executed any more times. + */ + @UseDefaults + Property<Boolean> done(); + + /** * Called just before the {@link org.apache.zest.library.scheduler.Task#run()} method is called. */ @@ -62,11 +107,6 @@ public interface Schedule void taskCompletedWithException( RuntimeException ex ); /** - * @return True if the associated {@link org.apache.zest.library.scheduler.Task} is currently running, false otherwise - */ - boolean isTaskRunning(); - - /** * Compute the next time this schedule is to be run. * * @param from The starting time when to look for the next time it will run. @@ -81,4 +121,5 @@ public interface Schedule * @return A String representing this schedule. */ String presentationString(); + } http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java index 8506d4b..133ec1c 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleFactory.java @@ -15,114 +15,23 @@ */ package org.apache.zest.library.scheduler.schedule; +import org.apache.zest.api.concern.Concerns; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern; +import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation; +import org.apache.zest.library.scheduler.defaults.DefaultScheduleFactoryMixin; import org.joda.time.DateTime; -import org.apache.zest.api.entity.EntityBuilder; -import org.apache.zest.api.injection.scope.Service; -import org.apache.zest.api.injection.scope.Structure; import org.apache.zest.api.mixin.Mixins; -import org.apache.zest.api.structure.Module; -import org.apache.zest.api.unitofwork.UnitOfWork; -import org.apache.zest.api.value.ValueBuilder; -import org.apache.zest.library.scheduler.SchedulerService; import org.apache.zest.library.scheduler.Task; -import org.apache.zest.library.scheduler.schedule.cron.CronSchedule; -import org.apache.zest.library.scheduler.schedule.once.OnceSchedule; -import org.apache.zest.spi.uuid.UuidIdentityGeneratorService; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -@Mixins( ScheduleFactory.Mixin.class ) +import static org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation.Propagation.MANDATORY; + +@Mixins( DefaultScheduleFactoryMixin.class ) +@Concerns( UnitOfWorkConcern.class ) public interface ScheduleFactory { - Schedule newCronSchedule( Task task, String cronExpression, DateTime start, boolean durable ); - - Schedule newOnceSchedule( Task task, DateTime runAt, boolean durable ); - - class Mixin - implements ScheduleFactory - { - private static final Logger logger = LoggerFactory.getLogger( ScheduleFactory.class ); - - @Structure - private Module module; - - @Service - private SchedulerService scheduler; - - @Service - private UuidIdentityGeneratorService uuid; - - @Override - public CronSchedule newCronSchedule( Task task, String cronExpression, DateTime start, boolean durable ) - { - if( durable ) - { - return newPersistentCronSchedule( task, cronExpression, start ); - } - return newTransientCronSchedule( task, cronExpression, start ); - } - - @Override - public Schedule newOnceSchedule( Task task, DateTime runAt, boolean durable ) - { - if( durable ) - { - return newPersistentOnceSchedule( task, runAt ); - } - return newTransientOnceSchedule( task, runAt ); - } - - private CronSchedule newTransientCronSchedule( Task task, String cronExpression, DateTime start ) - { - ValueBuilder<CronSchedule> builder = module.newValueBuilder( CronSchedule.class ); - CronSchedule prototype = builder.prototype(); - prototype.task().set( task ); - prototype.start().set( start ); - prototype.identity().set( uuid.generate( CronSchedule.class ) ); - prototype.cronExpression().set( cronExpression ); - CronSchedule schedule = builder.newInstance(); - logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() ); - return schedule; - } - - private CronSchedule newPersistentCronSchedule( Task task, String cronExpression, DateTime start ) - { - UnitOfWork uow = module.currentUnitOfWork(); - EntityBuilder<CronSchedule> builder = uow.newEntityBuilder( CronSchedule.class ); - CronSchedule builderInstance = builder.instance(); - builderInstance.task().set( task ); - builderInstance.start().set( start ); - builderInstance.identity().set( uuid.generate( CronSchedule.class ) ); - builderInstance.cronExpression().set( cronExpression ); - CronSchedule schedule = builder.newInstance(); - logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() ); - return schedule; - } - - private Schedule newTransientOnceSchedule( Task task, DateTime runAt ) - { - ValueBuilder<OnceSchedule> builder = module.newValueBuilder( OnceSchedule.class ); - OnceSchedule builderInstance = builder.prototype(); - builderInstance.task().set( task ); - builderInstance.start().set( runAt ); - builderInstance.identity().set( uuid.generate( CronSchedule.class ) ); - OnceSchedule schedule = builder.newInstance(); - logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() ); - return schedule; - } - - private Schedule newPersistentOnceSchedule( Task task, DateTime runAt ) - { - UnitOfWork uow = module.currentUnitOfWork(); - EntityBuilder<OnceSchedule> builder = uow.newEntityBuilder( OnceSchedule.class ); - OnceSchedule builderInstance = builder.instance(); - builderInstance.task().set( task ); - builderInstance.start().set( runAt ); - builderInstance.identity().set( uuid.generate( CronSchedule.class ) ); - OnceSchedule schedule = builder.newInstance(); - logger.info( "Schedule {} created: {}", schedule.presentationString(), schedule.identity().get() ); - return schedule; - } - } + @UnitOfWorkPropagation( MANDATORY) + Schedule newCronSchedule( Task task, String cronExpression, DateTime start ); + @UnitOfWorkPropagation( MANDATORY) + Schedule newOnceSchedule( Task task, DateTime runAt ); } http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java index 0560f9b..ed38cd6 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/ScheduleTime.java @@ -16,18 +16,17 @@ */ package org.apache.zest.library.scheduler.schedule; +import org.apache.zest.api.util.NullArgumentException; + public final class ScheduleTime implements Comparable<ScheduleTime> { - public String scheduleIdentity; - public long nextTime; + private String scheduleIdentity; + private long nextTime; public ScheduleTime( String scheduleIdentity, long nextTime ) { - if( scheduleIdentity == null ) - { - throw new IllegalArgumentException( "null not allowed: " + scheduleIdentity ); - } + NullArgumentException.validateNotEmpty( "scheduleIdentity", scheduleIdentity ); this.scheduleIdentity = scheduleIdentity; this.nextTime = nextTime; } @@ -59,6 +58,16 @@ public final class ScheduleTime return result; } + public long nextTime() + { + return nextTime; + } + + public String scheduleIdentity() + { + return scheduleIdentity; + } + @Override public int compareTo( ScheduleTime another ) { http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java index 84deb7a..d137cb3 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java @@ -42,24 +42,23 @@ public interface CronSchedule implements CronSchedule { private static final Logger LOGGER = LoggerFactory.getLogger( Schedule.class ); - private boolean running; @Override public void taskStarting() { - running = true; + running().set( true ); } @Override public void taskCompletedSuccessfully() { - running = false; + running().set(false); } @Override public void taskCompletedWithException( RuntimeException ex ) { - running = false; + running().set(false); } @Override @@ -69,13 +68,6 @@ public interface CronSchedule } @Override - public boolean isTaskRunning() - { - // See SchedulerMixin.ScheduleRunner::run - return false; - } - - @Override public long nextRun( long from ) { long actualFrom = from; @@ -84,11 +76,15 @@ public interface CronSchedule { actualFrom = firstRun; } - Long nextRun = new org.codeartisans.sked.cron.CronSchedule( cronExpression().get() ) - .firstRunAfter( actualFrom ); + Long nextRun = createCron().firstRunAfter( actualFrom ); LOGGER.info( "CronSchedule::nextRun({}) is {}", from, firstRun ); return nextRun; } + + private org.codeartisans.sked.cron.CronSchedule createCron() + { + return new org.codeartisans.sked.cron.CronSchedule( cronExpression().get() ); + } } } http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java index 52a63ca..66fdb21 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java @@ -26,41 +26,38 @@ public interface OnceSchedule abstract class OnceScheduleMixin implements OnceSchedule { - private boolean running; - @Override public void taskStarting() { - running = true; + running().set( true ); } @Override public void taskCompletedSuccessfully() { - running = false; + running().set( false ); } @Override public void taskCompletedWithException( RuntimeException ex ) { - running = false; - } - - @Override - public boolean isTaskRunning() - { - return running; + running().set( false ); } @Override public long nextRun( long from ) { + if( done().get() ) + { + return Long.MIN_VALUE; + } + done().set( true ); long runAt = start().get().getMillis(); if( runAt >= from ) { return runAt; } - return -1; + return from; } @Override @@ -69,5 +66,4 @@ public interface OnceSchedule return start().get().toString(); } } - } http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java index 798e451..1c2e7e7 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/Timeline.java @@ -18,7 +18,7 @@ */ package org.apache.zest.library.scheduler.timeline; -import org.joda.time.DateTime; +import java.time.ZonedDateTime; import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation; /** @@ -57,7 +57,7 @@ public interface Timeline */ @UnitOfWorkPropagation( UnitOfWorkPropagation.Propagation.MANDATORY ) // START SNIPPET: timeline - Iterable<TimelineRecord> getRecords( DateTime from, DateTime to ); + Iterable<TimelineRecord> getRecords( ZonedDateTime from, ZonedDateTime to ); // END SNIPPET: timeline /** http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java index 3a4c024..d66898e 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineScheduleMixin.java @@ -15,6 +15,7 @@ */ package org.apache.zest.library.scheduler.timeline; +import java.time.ZonedDateTime; import java.util.Collection; import java.util.Collections; import java.util.List; @@ -70,9 +71,9 @@ public class TimelineScheduleMixin } @Override - public Iterable<TimelineRecord> getRecords( DateTime from, DateTime to ) + public Iterable<TimelineRecord> getRecords( ZonedDateTime from, ZonedDateTime to ) { - return getRecords( from.getMillis(), to.getMillis() ); + return getRecords( from.toInstant().toEpochMilli(), to.toInstant().toEpochMilli() ); } @Override http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java index 1823cb0..3e097b4 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/timeline/TimelineSchedulerServiceMixin.java @@ -15,17 +15,16 @@ */ package org.apache.zest.library.scheduler.timeline; +import java.time.ZonedDateTime; import java.util.SortedSet; import java.util.TreeSet; -import org.joda.time.DateTime; -import org.apache.zest.api.injection.scope.Service; import org.apache.zest.api.injection.scope.Structure; +import org.apache.zest.api.injection.scope.This; import org.apache.zest.api.service.ServiceComposite; import org.apache.zest.api.structure.Module; -import org.apache.zest.api.unitofwork.UnitOfWork; import org.apache.zest.functional.Iterables; -import org.apache.zest.library.scheduler.SchedulerMixin; import org.apache.zest.library.scheduler.SchedulerService; +import org.apache.zest.library.scheduler.SchedulesHandler; import org.apache.zest.library.scheduler.schedule.Schedule; import org.apache.zest.library.scheduler.schedule.Schedules; @@ -39,17 +38,18 @@ public abstract class TimelineSchedulerServiceMixin @Structure private Module module; - @Service + @This private SchedulerService scheduler; + @This + private SchedulesHandler schedulesHandler; + @Override public Iterable<TimelineRecord> getLastRecords( int maxResults ) { SortedSet<TimelineRecord> result = new TreeSet<>(); - UnitOfWork uow = module.currentUnitOfWork(); - String schedulesName = SchedulerMixin.getSchedulesIdentity( scheduler ); - Schedules schedules = uow.get( Schedules.class, schedulesName ); + Schedules schedules = schedulesHandler.getActiveSchedules(); for( Schedule schedule : schedules.schedules() ) { Timeline timeline = (Timeline) schedule; @@ -63,9 +63,7 @@ public abstract class TimelineSchedulerServiceMixin public Iterable<TimelineRecord> getNextRecords( int maxResults ) { SortedSet<TimelineRecord> result = new TreeSet<>(); - UnitOfWork uow = module.currentUnitOfWork(); - String schedulesName = SchedulerMixin.getSchedulesIdentity( scheduler ); - Schedules schedules = uow.get( Schedules.class, schedulesName ); + Schedules schedules = schedulesHandler.getActiveSchedules(); for( Schedule schedule : schedules.schedules() ) { Timeline timeline = (Timeline) schedule; @@ -76,13 +74,11 @@ public abstract class TimelineSchedulerServiceMixin } @Override - public Iterable<TimelineRecord> getRecords( DateTime from, DateTime to ) + public Iterable<TimelineRecord> getRecords( ZonedDateTime from, ZonedDateTime to ) { SortedSet<TimelineRecord> result = new TreeSet<>(); - UnitOfWork uow = module.currentUnitOfWork(); - String schedulesName = SchedulerMixin.getSchedulesIdentity( scheduler ); - Schedules schedules = uow.get( Schedules.class, schedulesName ); + Schedules schedules = schedulesHandler.getActiveSchedules(); for( Schedule schedule : schedules.schedules() ) { Timeline timeline = (Timeline) schedule; @@ -97,9 +93,7 @@ public abstract class TimelineSchedulerServiceMixin { SortedSet<TimelineRecord> result = new TreeSet<>(); - UnitOfWork uow = module.currentUnitOfWork(); - String schedulesName = SchedulerMixin.getSchedulesIdentity( scheduler ); - Schedules schedules = uow.get( Schedules.class, schedulesName ); + Schedules schedules = schedulesHandler.getActiveSchedules(); for( Schedule schedule : schedules.schedules() ) { Timeline timeline = (Timeline) schedule; http://git-wip-us.apache.org/repos/asf/zest-java/blob/02491a34/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java index ca5b8bd..02d5636 100644 --- a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java +++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/AbstractSchedulerTest.java @@ -18,12 +18,18 @@ package org.apache.zest.library.scheduler; import org.apache.zest.api.entity.EntityBuilder; +import org.apache.zest.api.entity.IdentityGenerator; import org.apache.zest.api.unitofwork.UnitOfWork; +import org.apache.zest.api.value.ValueSerialization; import org.apache.zest.bootstrap.AssemblyException; import org.apache.zest.bootstrap.ModuleAssembly; +import org.apache.zest.bootstrap.ServiceDeclaration; +import org.apache.zest.entitystore.memory.MemoryEntityStoreService; import org.apache.zest.index.rdf.assembly.RdfMemoryStoreAssembler; +import org.apache.zest.spi.uuid.UuidIdentityGeneratorService; import org.apache.zest.test.AbstractZestTest; import org.apache.zest.test.EntityTestAssembler; +import org.apache.zest.valueserialization.orgjson.OrgJsonValueSerializationService; public abstract class AbstractSchedulerTest extends AbstractZestTest @@ -34,7 +40,9 @@ public abstract class AbstractSchedulerTest { assembly.entities( FooTask.class ); - new EntityTestAssembler().assemble( assembly ); + assembly.services( MemoryEntityStoreService.class ); + assembly.services( UuidIdentityGeneratorService.class).withMixins( CountingIdentityGeneratorService.class ); + assembly.services( OrgJsonValueSerializationService.class ).taggedWith( ValueSerialization.Formats.JSON ); new RdfMemoryStoreAssembler().assemble( assembly ); onAssembly( assembly ); @@ -51,4 +59,16 @@ public abstract class AbstractSchedulerTest task.input().set( input ); return builder.newInstance(); } + + public static class CountingIdentityGeneratorService + implements IdentityGenerator + { + int counter = 0; + + @Override + public String generate( Class<?> compositeType ) + { + return compositeType.getSimpleName() + ":" + counter++; + } + } }
