Repository: zest-java
Updated Branches:
  refs/heads/develop d85a93fe8 -> 73b7f1969


ZEST-128 - Chasing race condition problems.


Project: http://git-wip-us.apache.org/repos/asf/zest-java/repo
Commit: http://git-wip-us.apache.org/repos/asf/zest-java/commit/551f6846
Tree: http://git-wip-us.apache.org/repos/asf/zest-java/tree/551f6846
Diff: http://git-wip-us.apache.org/repos/asf/zest-java/diff/551f6846

Branch: refs/heads/develop
Commit: 551f68463b594aa95768fc884154871436c0edf8
Parents: d85a93f
Author: Niclas Hedhman <[email protected]>
Authored: Sat Nov 14 18:56:19 2015 +0800
Committer: Niclas Hedhman <[email protected]>
Committed: Sat Nov 14 18:56:19 2015 +0800

----------------------------------------------------------------------
 .../association/AssociationAssignmentTest.java  | 83 ++++++++++++++++++++
 .../zest/library/scheduler/Execution.java       | 37 ++++-----
 .../zest/library/scheduler/SchedulerMixin.java  |  2 +-
 .../zest/library/scheduler/TaskRunner.java      | 29 ++++++-
 .../scheduler/schedule/cron/CronSchedule.java   |  3 -
 .../scheduler/schedule/once/OnceSchedule.java   |  3 -
 .../library/scheduler/CronScheduleTest.java     | 82 +++++++++++++++++++
 7 files changed, 207 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/core/runtime/src/test/java/org/apache/zest/runtime/association/AssociationAssignmentTest.java
----------------------------------------------------------------------
diff --git 
a/core/runtime/src/test/java/org/apache/zest/runtime/association/AssociationAssignmentTest.java
 
b/core/runtime/src/test/java/org/apache/zest/runtime/association/AssociationAssignmentTest.java
new file mode 100644
index 0000000..e224008
--- /dev/null
+++ 
b/core/runtime/src/test/java/org/apache/zest/runtime/association/AssociationAssignmentTest.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.zest.runtime.association;
+
+import org.apache.zest.api.association.Association;
+import org.apache.zest.api.entity.EntityBuilder;
+import org.apache.zest.api.entity.EntityComposite;
+import org.apache.zest.api.unitofwork.UnitOfWork;
+import org.apache.zest.bootstrap.AssemblyException;
+import org.apache.zest.bootstrap.ModuleAssembly;
+import org.apache.zest.entitystore.memory.MemoryEntityStoreService;
+import org.apache.zest.spi.uuid.UuidIdentityGeneratorService;
+import org.apache.zest.test.AbstractZestTest;
+import 
org.apache.zest.valueserialization.orgjson.OrgJsonValueSerializationAssembler;
+import org.junit.Test;
+
+import static org.hamcrest.core.IsEqual.equalTo;
+import static org.hamcrest.core.IsNull.notNullValue;
+import static org.junit.Assert.assertThat;
+
+public class AssociationAssignmentTest extends AbstractZestTest
+{
+
+    @Override
+    public void assemble( ModuleAssembly module )
+        throws AssemblyException
+    {
+        module.services( MemoryEntityStoreService.class );
+        module.services( UuidIdentityGeneratorService.class );
+        new OrgJsonValueSerializationAssembler().assemble( module );
+        module.entities( TheAssociatedType.class );
+        module.entities( TheMainType.class );
+    }
+
+    @Test
+    public void 
givenAssignmentOfAssociationAtCreationWhenDereferencingAssocationExpectCorrectValue()
+        throws Exception
+    {
+        UnitOfWork work = module.newUnitOfWork();
+        TheAssociatedType entity1 = work.newEntity( TheAssociatedType.class );
+        EntityBuilder<TheMainType> builder = work.newEntityBuilder( 
TheMainType.class );
+        builder.instance().assoc().set( entity1 );
+        TheMainType entity2 = builder.newInstance();
+        String id1 = entity1.identity().get();
+        String id2 = entity2.identity().get();
+        work.complete();
+        assertThat(id1, notNullValue());
+        assertThat(id2, notNullValue());
+
+        work = module.newUnitOfWork();
+        TheMainType entity3 = work.get(TheMainType.class, id2 );
+        TheAssociatedType entity4 = entity3.assoc().get();
+        assertThat( entity4.identity().get(), equalTo(id1));
+        work.discard();
+    }
+
+    public interface TheAssociatedType extends EntityComposite
+    {
+    }
+
+    public interface TheMainType extends EntityComposite
+    {
+        Association<TheAssociatedType> assoc();
+    }
+}

http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
----------------------------------------------------------------------
diff --git 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
index 3bc120c..d3a732b 100644
--- 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
+++ 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/Execution.java
@@ -20,12 +20,10 @@
 
 package org.apache.zest.library.scheduler;
 
-import java.lang.reflect.UndeclaredThrowableException;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.RejectedExecutionHandler;
-import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
@@ -37,7 +35,6 @@ import org.apache.zest.api.mixin.Mixins;
 import org.apache.zest.api.structure.Module;
 import org.apache.zest.api.unitofwork.NoSuchEntityException;
 import org.apache.zest.api.unitofwork.UnitOfWork;
-import org.apache.zest.api.unitofwork.UnitOfWorkCompletionException;
 import org.apache.zest.api.unitofwork.concern.UnitOfWorkConcern;
 import org.apache.zest.api.unitofwork.concern.UnitOfWorkPropagation;
 import org.apache.zest.api.unitofwork.concern.UnitOfWorkRetry;
@@ -84,7 +81,7 @@ public interface Execution
         private final SortedSet<ScheduleTime> timingQueue = new TreeSet<>();
         private volatile boolean running;
         private ThreadPoolExecutor taskExecutor;
-        private Thread scheduleThread;
+        private volatile Thread scheduleThread;
 
         @Override
         @UnitOfWorkPropagation
@@ -114,6 +111,10 @@ public interface Execution
                     {
                         // Ignore. Used to signal "Hey, wake up. Time to 
work..."
                     }
+                    catch( Throwable e )
+                    {
+                        e.printStackTrace();
+                    }
                 }
             }
         }
@@ -133,29 +134,21 @@ public interface Execution
         public void updateNextTime( ScheduleTime scheduleTime )
         {
             long now = System.currentTimeMillis();
-
-            try (UnitOfWork uow = module.newUnitOfWork())
+            UnitOfWork uow = module.currentUnitOfWork();
+            try
             {
-                try
-                {
-                    Schedule schedule = uow.get( Schedule.class, 
scheduleTime.scheduleIdentity() );
-                    long nextTime = schedule.nextRun( now );
-                    if( nextTime != Long.MIN_VALUE )
-                    {
-                        scheduleTime = new ScheduleTime( 
schedule.identity().get(), nextTime );
-                        timingQueue.add( scheduleTime );
-                    }
-                }
-                catch( NoSuchEntityException e )
+                Schedule schedule = uow.get( Schedule.class, 
scheduleTime.scheduleIdentity() );
+                long nextTime = schedule.nextRun( now + 1000 );
+                if( nextTime != Long.MIN_VALUE )
                 {
-                    // Schedule has been removed.
-                    scheduler.cancelSchedule( scheduleTime.scheduleIdentity() 
);
+                    scheduleTime = new ScheduleTime( 
schedule.identity().get(), nextTime );
+                    timingQueue.add( scheduleTime );
                 }
-                uow.complete();
             }
-            catch( UnitOfWorkCompletionException e )
+            catch( NoSuchEntityException e )
             {
-                throw new UndeclaredThrowableException( e );
+                // Schedule has been removed.
+                scheduler.cancelSchedule( scheduleTime.scheduleIdentity() );
             }
         }
 

http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
----------------------------------------------------------------------
diff --git 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
index 52c2f56..9f7423c 100644
--- 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
+++ 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/SchedulerMixin.java
@@ -172,8 +172,8 @@ public class SchedulerMixin
         // Throws IllegalArgument if corePoolSize or keepAliveTime less than 
zero,
         // or if workersCount less than or equal to zero,
         // or if corePoolSize greater than workersCount.
-        loadSchedules();
         execution.start();
+        loadSchedules();
         LOGGER.debug( "Activated" );
     }
 

http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
----------------------------------------------------------------------
diff --git 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
index 9f5d772..3e0e975 100644
--- 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
+++ 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/TaskRunner.java
@@ -21,6 +21,7 @@
 package org.apache.zest.library.scheduler;
 
 import java.lang.reflect.UndeclaredThrowableException;
+import java.util.concurrent.locks.ReentrantLock;
 import org.apache.zest.api.injection.scope.Structure;
 import org.apache.zest.api.injection.scope.Uses;
 import org.apache.zest.api.structure.Module;
@@ -32,6 +33,8 @@ import 
org.apache.zest.library.scheduler.schedule.ScheduleTime;
 public class TaskRunner
     implements Runnable
 {
+    private static ReentrantLock lock = new ReentrantLock();
+
     @Structure
     private Module module;
 
@@ -42,30 +45,37 @@ public class TaskRunner
     public void run()
     {
         // TODO: (niclas) I am NOT happy with this implementation, requiring 3 
UnitOfWorks to be created. 15-20 milliseconds on my MacBook. If there is a 
better way to detect overrun, two of those might not be needed.
+        UnitOfWork uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( 
"Task Runner initialize" ) );
         try
         {
-            UnitOfWork uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( 
"Task Runner initialize" ) );
+            lock.lock();
             Schedule schedule = uow.get( Schedule.class, 
this.schedule.scheduleIdentity() );
             if( !schedule.running().get() )  // check for overrun.
             {
                 try
                 {
                     schedule.taskStarting();
+                    schedule.running().set( true );
                     uow.complete();                     // This completion is 
needed to detect overrun
+                    lock.unlock();
+
                     uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( 
"Task Runner" ) );
                     schedule = uow.get( schedule );     // re-attach the 
entity to the new UnitOfWork
                     Task task = schedule.task().get();
                     task.run();
-                    uow.complete();                     // Need this to avoid 
ConcurrentModificationException when there has been an overrun.
+                    uow.complete();
+                    lock.lock();
                     uow = module.newUnitOfWork( UsecaseBuilder.newUsecase( 
"Task Runner conclude" ) );
                     schedule = uow.get( schedule );     // re-attach the 
entity to the new UnitOfWork
+                    schedule.running().set( false );
                     schedule.taskCompletedSuccessfully();
+                    schedule.executionCounter().set( 
schedule.executionCounter().get() + 1 );
                 }
                 catch( RuntimeException ex )
                 {
+                    schedule.running().set( false );
                     processException( schedule, ex );
                 }
-                schedule.executionCounter().set( 
schedule.executionCounter().get() + 1 );
             }
             else
             {
@@ -75,8 +85,21 @@ public class TaskRunner
         }
         catch( Exception e )
         {
+            e.printStackTrace();
             throw new UndeclaredThrowableException( e );
         }
+        finally
+        {
+            uow.discard();
+            try
+            {
+                lock.unlock();
+            }
+            catch( IllegalMonitorStateException e )
+            {
+                // ignore, as it may happen on certain exceptions.
+            }
+        }
     }
 
     private void processException( Schedule schedule, RuntimeException ex )

http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
----------------------------------------------------------------------
diff --git 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
index 9036592..4597e75 100644
--- 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
+++ 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/cron/CronSchedule.java
@@ -46,19 +46,16 @@ public interface CronSchedule
         @Override
         public void taskStarting()
         {
-            running().set( true );
         }
 
         @Override
         public void taskCompletedSuccessfully()
         {
-            running().set(false);
         }
 
         @Override
         public void taskCompletedWithException( Throwable ex )
         {
-            running().set(false);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
----------------------------------------------------------------------
diff --git 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
index ca31cf4..8c78cc6 100644
--- 
a/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
+++ 
b/libraries/scheduler/src/main/java/org/apache/zest/library/scheduler/schedule/once/OnceSchedule.java
@@ -29,19 +29,16 @@ public interface OnceSchedule
         @Override
         public void taskStarting()
         {
-            running().set( true );
         }
 
         @Override
         public void taskCompletedSuccessfully()
         {
-            running().set( false );
         }
 
         @Override
         public void taskCompletedWithException( Throwable ex )
         {
-            running().set( false );
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/zest-java/blob/551f6846/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/CronScheduleTest.java
----------------------------------------------------------------------
diff --git 
a/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/CronScheduleTest.java
 
b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/CronScheduleTest.java
new file mode 100644
index 0000000..76063f0
--- /dev/null
+++ 
b/libraries/scheduler/src/test/java/org/apache/zest/library/scheduler/CronScheduleTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.zest.library.scheduler;
+
+import org.apache.zest.api.entity.EntityBuilder;
+import org.apache.zest.api.unitofwork.UnitOfWork;
+import org.apache.zest.bootstrap.AssemblyException;
+import org.apache.zest.bootstrap.ModuleAssembly;
+import org.apache.zest.entitystore.memory.MemoryEntityStoreService;
+import org.apache.zest.library.scheduler.schedule.cron.CronSchedule;
+import org.apache.zest.spi.uuid.UuidIdentityGeneratorService;
+import org.apache.zest.test.AbstractZestTest;
+import 
org.apache.zest.valueserialization.orgjson.OrgJsonValueSerializationAssembler;
+import org.joda.time.DateTime;
+import org.junit.Test;
+
+import static org.hamcrest.number.IsCloseTo.closeTo;
+import static org.junit.Assert.assertThat;
+
+public class CronScheduleTest extends AbstractZestTest
+{
+    @Override
+    public void assemble( ModuleAssembly module )
+        throws AssemblyException
+    {
+        new OrgJsonValueSerializationAssembler().assemble( module );
+        module.services( MemoryEntityStoreService.class );
+        module.services( UuidIdentityGeneratorService.class );
+        module.entities( CronSchedule.class );
+        module.entities( Task.class ).withMixins( DummyTask.class );
+    }
+
+    @Test
+    public void given15SecondCronWhenRequestingNextExpectEvery15Seconds()
+        throws Exception
+    {
+
+        UnitOfWork work = module.newUnitOfWork();
+        EntityBuilder<Task> builder1 = work.newEntityBuilder( Task.class );
+        builder1.instance().name().set( "abc" );
+        Task task = builder1.newInstance();
+        EntityBuilder<CronSchedule> builder = work.newEntityBuilder( 
CronSchedule.class );
+        builder.instance().start().set( DateTime.now() );
+        builder.instance().task().set( task );
+        builder.instance().cronExpression().set( "*/15 * * * * *" );
+        CronSchedule schedule = builder.newInstance();
+        long runAt = schedule.nextRun( System.currentTimeMillis() );
+        for( int i = 0; i < 1000; i++ )
+        {
+            long nextRun = schedule.nextRun( runAt + 1000 );  // Needs to push 
forward one second...
+            assertThat( "At:" + i, (double) nextRun, closeTo( runAt + 15000, 
50 ) );
+        }
+        work.discard();
+    }
+
+    public static abstract class DummyTask implements Task
+    {
+        @Override
+        public void run()
+        {
+            System.out.println( "Dummy" );
+        }
+    }
+}

Reply via email to