Repository: zest-java Updated Branches: refs/heads/develop d85a93fe8 -> 73b7f1969
ZEST-128 - Chasing race condition problems. Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/551f6846 Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/551f6846 Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/551f6846 Branch: refs/heads/develop Commit: 551f68463b594aa95768fc884154871436c0edf8 Parents: d85a93f Author: Niclas Hedhman <[email protected]> Authored: Sat Nov 14 18:56:19 2015 +0800 Committer: Niclas Hedhman <[email protected]> Committed: Sat Nov 14 18:56:19 2015 +0800 ---------------------------------------------------------------------- .../association/AssociationAssignmentTest.java | 83 ++++++++++++++++++++ .../zest/library/scheduler/Execution.java | 37 ++++----- .../zest/library/scheduler/SchedulerMixin.java | 2 +- .../zest/library/scheduler/TaskRunner.java | 29 ++++++- .../scheduler/schedule/cron/CronSchedule.java | 3 - .../scheduler/schedule/once/OnceSchedule.java | 3 - .../library/scheduler/CronScheduleTest.java | 82 +++++++++++++++++++ 7 files changed, 207 insertions(+), 32 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/core/runtime/src/test/java/org/apache/zest/runtime/association/AssociationAssignmentTest.java ---------------------------------------------------------------------- diff --git a/core/runtime/src/test/java/org/apache/zest/runtime/association/AssociationAssignmentTest.java b/core/runtime/src/test/java/org/apache/zest/runtime/association/AssociationAssignmentTest.java new file mode 100644 index 0000000..e224008 --- /dev/null +++ b/core/runtime/src/test/java/org/apache/zest/runtime/association/AssociationAssignmentTest.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.zest.runtime.association; + +import org.apache.zest.api.association.Association; +import org.apache.zest.api.entity.EntityBuilder; +import org.apache.zest.api.entity.EntityComposite; +import org.apache.zest.api.unitofwork.UnitOfWork; +import org.apache.zest.bootstrap.AssemblyException; +import org.apache.zest.bootstrap.ModuleAssembly; +import org.apache.zest.entitystore.memory.MemoryEntityStoreService; +import org.apache.zest.spi.uuid.UuidIdentityGeneratorService; +import org.apache.zest.test.AbstractZestTest; +import org.apache.zest.valueserialization.orgjson.OrgJsonValueSerializationAssembler; +import org.junit.Test; + +import static org.hamcrest.core.IsEqual.equalTo; +import static org.hamcrest.core.IsNull.notNullValue; +import static org.junit.Assert.assertThat; + +public class AssociationAssignmentTest extends AbstractZestTest +{ + + @Override + public void assemble( ModuleAssembly module ) + throws AssemblyException + { + module.services( MemoryEntityStoreService.class ); + module.services( UuidIdentityGeneratorService.class ); + new OrgJsonValueSerializationAssembler().assemble( module ); + module.entities( TheAssociatedType.class ); + module.entities( TheMainType.class ); + } + + @Test + public void givenAssignmentOfAssociationAtCreationWhenDereferencingAssocationExpectCorrectValue() + throws Exception + { + UnitOfWork work = module.newUnitOfWork(); + TheAssociatedType entity1 = work.newEntity( TheAssociatedType.class ); + EntityBuilder<TheMainType> builder = work.newEntityBuilder( TheMainType.class ); + builder.instance().assoc().set( entity1 ); + TheMainType entity2 = builder.newInstance(); + String id1 = entity1.identity().get(); + String id2 = entity2.identity().get(); + work.complete(); + assertThat(id1, notNullValue()); + assertThat(id2, notNullValue()); + + work = module.newUnitOfWork(); + TheMainType entity3 = work.get(TheMainType.class, id2 ); + TheAssociatedType entity4 = entity3.assoc().get(); + assertThat( entity4.identity().get(), equalTo(id1)); + work.discard(); + } + + public interface TheAssociatedType extends EntityComposite + { + } + + public interface TheMainType extends EntityComposite + { + Association<TheAssociatedType> assoc(); + } +} http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java index 3bc120c..d3a732b 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java @@ -20,12 +20,10 @@ package org.apache.zest.library.scheduler; -import java.lang.reflect.UndeclaredThrowableException; import java.util.SortedSet; import java.util.TreeSet; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.RejectedExecutionHandler; -import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.ThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -37,7 +35,6 @@ import org.apache.zest.api.mixin.Mixins; import org.apache.zest.api.structure.Module; import org.apache.zest.api.unitofwork.NoSuchEntityException; import org.apache.zest.api.unitofwork.UnitOfWork; -import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException; import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern; import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation; import org.apache.zest.api.unitofwork.concern.UnitOfWorkRetry; @@ -84,7 +81,7 @@ public interface Execution private final SortedSet<ScheduleTime> timingQueue = new TreeSet<>(); private volatile boolean running; private ThreadPoolExecutor taskExecutor; - private Thread scheduleThread; + private volatile Thread scheduleThread; @Override @UnitOfWorkPropagation @@ -114,6 +111,10 @@ public interface Execution { // Ignore. Used to signal "Hey, wake up. Time to work..." } + catch( Throwable e ) + { + e.printStackTrace(); + } } } } @@ -133,29 +134,21 @@ public interface Execution public void updateNextTime( ScheduleTime scheduleTime ) { long now = System.currentTimeMillis(); - - try (UnitOfWork uow = module.newUnitOfWork()) + UnitOfWork uow = module.currentUnitOfWork(); + try { - try - { - Schedule schedule = uow.get( Schedule.class, scheduleTime.scheduleIdentity() ); - long nextTime = schedule.nextRun( now ); - if( nextTime != Long.MIN_VALUE ) - { - scheduleTime = new ScheduleTime( schedule.identity().get(), nextTime ); - timingQueue.add( scheduleTime ); - } - } - catch( NoSuchEntityException e ) + Schedule schedule = uow.get( Schedule.class, scheduleTime.scheduleIdentity() ); + long nextTime = schedule.nextRun( now + 1000 ); + if( nextTime != Long.MIN_VALUE ) { - // Schedule has been removed. - scheduler.cancelSchedule( scheduleTime.scheduleIdentity() ); + scheduleTime = new ScheduleTime( schedule.identity().get(), nextTime ); + timingQueue.add( scheduleTime ); } - uow.complete(); } - catch( UnitOfWorkCompletionException e ) + catch( NoSuchEntityException e ) { - throw new UndeclaredThrowableException( e ); + // Schedule has been removed. + scheduler.cancelSchedule( scheduleTime.scheduleIdentity() ); } } http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java index 52c2f56..9f7423c 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java @@ -172,8 +172,8 @@ public class SchedulerMixin // Throws IllegalArgument if corePoolSize or keepAliveTime less than zero, // or if workersCount less than or equal to zero, // or if corePoolSize greater than workersCount. - loadSchedules(); execution.start(); + loadSchedules(); LOGGER.debug( "Activated" ); } http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java index 9f5d772..3e0e975 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java @@ -21,6 +21,7 @@ package org.apache.zest.library.scheduler; import java.lang.reflect.UndeclaredThrowableException; +import java.util.concurrent.locks.ReentrantLock; import org.apache.zest.api.injection.scope.Structure; import org.apache.zest.api.injection.scope.Uses; import org.apache.zest.api.structure.Module; @@ -32,6 +33,8 @@ import org.apache.zest.library.scheduler.schedule.ScheduleTime; public class TaskRunner implements Runnable { + private static ReentrantLock lock = new ReentrantLock(); + @Structure private Module module; @@ -42,30 +45,37 @@ public class TaskRunner public void run() { // TODO: (niclas) I am NOT happy with this implementation, requiring 3 UnitOfWorks to be created. 15-20 milliseconds on my MacBook. If there is a better way to detect overrun, two of those might not be needed. + UnitOfWork uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Task Runner initialize" ) ); try { - UnitOfWork uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Task Runner initialize" ) ); + lock.lock(); Schedule schedule = uow.get( Schedule.class, this.schedule.scheduleIdentity() ); if( !schedule.running().get() ) // check for overrun. { try { schedule.taskStarting(); + schedule.running().set( true ); uow.complete(); // This completion is needed to detect overrun + lock.unlock(); + uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Task Runner" ) ); schedule = uow.get( schedule ); // re-attach the entity to the new UnitOfWork Task task = schedule.task().get(); task.run(); - uow.complete(); // Need this to avoid ConcurrentModificationException when there has been an overrun. + uow.complete(); + lock.lock(); uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( "Task Runner conclude" ) ); schedule = uow.get( schedule ); // re-attach the entity to the new UnitOfWork + schedule.running().set( false ); schedule.taskCompletedSuccessfully(); + schedule.executionCounter().set( schedule.executionCounter().get() + 1 ); } catch( RuntimeException ex ) { + schedule.running().set( false ); processException( schedule, ex ); } - schedule.executionCounter().set( schedule.executionCounter().get() + 1 ); } else { @@ -75,8 +85,21 @@ public class TaskRunner } catch( Exception e ) { + e.printStackTrace(); throw new UndeclaredThrowableException( e ); } + finally + { + uow.discard(); + try + { + lock.unlock(); + } + catch( IllegalMonitorStateException e ) + { + // ignore, as it may happen on certain exceptions. + } + } } private void processException( Schedule schedule, RuntimeException ex ) http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java index 9036592..4597e75 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java @@ -46,19 +46,16 @@ public interface CronSchedule @Override public void taskStarting() { - running().set( true ); } @Override public void taskCompletedSuccessfully() { - running().set(false); } @Override public void taskCompletedWithException( Throwable ex ) { - running().set(false); } @Override http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java index ca31cf4..8c78cc6 100644 --- a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java +++ b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java @@ -29,19 +29,16 @@ public interface OnceSchedule @Override public void taskStarting() { - running().set( true ); } @Override public void taskCompletedSuccessfully() { - running().set( false ); } @Override public void taskCompletedWithException( Throwable ex ) { - running().set( false ); } @Override http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/CronScheduleTest.java ---------------------------------------------------------------------- diff --git a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/CronScheduleTest.java b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/CronScheduleTest.java new file mode 100644 index 0000000..76063f0 --- /dev/null +++ b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/CronScheduleTest.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.zest.library.scheduler; + +import org.apache.zest.api.entity.EntityBuilder; +import org.apache.zest.api.unitofwork.UnitOfWork; +import org.apache.zest.bootstrap.AssemblyException; +import org.apache.zest.bootstrap.ModuleAssembly; +import org.apache.zest.entitystore.memory.MemoryEntityStoreService; +import org.apache.zest.library.scheduler.schedule.cron.CronSchedule; +import org.apache.zest.spi.uuid.UuidIdentityGeneratorService; +import org.apache.zest.test.AbstractZestTest; +import org.apache.zest.valueserialization.orgjson.OrgJsonValueSerializationAssembler; +import org.joda.time.DateTime; +import org.junit.Test; + +import static org.hamcrest.number.IsCloseTo.closeTo; +import static org.junit.Assert.assertThat; + +public class CronScheduleTest extends AbstractZestTest +{ + @Override + public void assemble( ModuleAssembly module ) + throws AssemblyException + { + new OrgJsonValueSerializationAssembler().assemble( module ); + module.services( MemoryEntityStoreService.class ); + module.services( UuidIdentityGeneratorService.class ); + module.entities( CronSchedule.class ); + module.entities( Task.class ).withMixins( DummyTask.class ); + } + + @Test + public void given15SecondCronWhenRequestingNextExpectEvery15Seconds() + throws Exception + { + + UnitOfWork work = module.newUnitOfWork(); + EntityBuilder<Task> builder1 = work.newEntityBuilder( Task.class ); + builder1.instance().name().set( "abc" ); + Task task = builder1.newInstance(); + EntityBuilder<CronSchedule> builder = work.newEntityBuilder( CronSchedule.class ); + builder.instance().start().set( DateTime.now() ); + builder.instance().task().set( task ); + builder.instance().cronExpression().set( "*/15 * * * * *" ); + CronSchedule schedule = builder.newInstance(); + long runAt = schedule.nextRun( System.currentTimeMillis() ); + for( int i = 0; i < 1000; i++ ) + { + long nextRun = schedule.nextRun( runAt + 1000 ); // Needs to push forward one second... + assertThat( "At:" + i, (double) nextRun, closeTo( runAt + 15000, 50 ) ); + } + work.discard(); + } + + public static abstract class DummyTask implements Task + { + @Override + public void run() + { + System.out.println( "Dummy" ); + } + } +}
