Author: ash2k Date: Mon Apr 1 08:07:14 2013 New Revision: 1463109 URL: http://svn.apache.org/r1463109 Log: [ONAMI-105] improve interruption handling in warmup, fix test
Modified: incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUpTask.java incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUper.java incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/Dag1.java incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/TestWarmUpManager.java Modified: incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUpTask.java URL: http://svn.apache.org/viewvc/incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUpTask.java?rev=1463109&r1=1463108&r2=1463109&view=diff ============================================================================== --- incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUpTask.java (original) +++ incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUpTask.java Mon Apr 1 08:07:14 2013 @@ -92,6 +92,14 @@ class WarmUpTask { for ( Stageable stageable : stageables ) { + if ( Thread.interrupted() ) + { + // Warmup is taking too long - thread was interrupted. + // Skip other stageables. + // Maintain interruption state to let other tasks know about it. + Thread.currentThread().interrupt(); + break; + } stageable.stage( stageHandler ); } } Modified: incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUper.java URL: http://svn.apache.org/viewvc/incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUper.java?rev=1463109&r1=1463108&r2=1463109&view=diff ============================================================================== --- incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUper.java (original) +++ incubator/onami/trunk/lifecycle/warmup/src/main/java/org/apache/onami/lifecycle/warmup/WarmUper.java Mon Apr 1 08:07:14 2013 @@ -136,6 +136,7 @@ public class WarmUper<A extends Annotati } catch ( InterruptedException e ) { + forkJoinPool.shutdownNow(); Thread.currentThread().interrupt(); } Modified: incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/Dag1.java URL: http://svn.apache.org/viewvc/incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/Dag1.java?rev=1463109&r1=1463108&r2=1463109&view=diff ============================================================================== --- incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/Dag1.java (original) +++ incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/Dag1.java Mon Apr 1 08:07:14 2013 @@ -19,6 +19,8 @@ package org.apachi.onami.lifecycle.warmu * under the License. */ +import java.util.concurrent.CountDownLatch; + import com.google.inject.Inject; import com.google.inject.Singleton; import org.apache.onami.lifecycle.warmup.WarmUp; @@ -40,18 +42,27 @@ public class Dag1 public static class A { private final Recorder recorder; + private final CountDownLatch latch; @Inject - public A( Recorder recorder, B b, C c ) + public A( Recorder recorder, B b, C c, CountDownLatch latch ) { this.recorder = recorder; + this.latch = latch; } @WarmUp public void warmUp() throws InterruptedException { - recorder.record( "A" ); + try + { + recorder.record( "A" ); + } + finally + { + latch.countDown(); + } } } @@ -59,18 +70,27 @@ public class Dag1 public static class B { private final Recorder recorder; + private final CountDownLatch latch; @Inject - public B( Recorder recorder ) + public B( Recorder recorder, CountDownLatch latch ) { this.recorder = recorder; + this.latch = latch; } @WarmUp public void warmUp() throws InterruptedException { - recorder.record( "B" ); + try + { + recorder.record( "B" ); + } + finally + { + latch.countDown(); + } } } @@ -78,18 +98,27 @@ public class Dag1 public static class C { private final Recorder recorder; + private final CountDownLatch latch; @Inject - public C( Recorder recorder ) + public C( Recorder recorder, CountDownLatch latch ) { this.recorder = recorder; + this.latch = latch; } @WarmUp public void warmUp() throws InterruptedException { - recorder.record( "C" ); + try + { + recorder.record( "C" ); + } + finally + { + latch.countDown(); + } } } } Modified: incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/TestWarmUpManager.java URL: http://svn.apache.org/viewvc/incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/TestWarmUpManager.java?rev=1463109&r1=1463108&r2=1463109&view=diff ============================================================================== --- incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/TestWarmUpManager.java (original) +++ incubator/onami/trunk/lifecycle/warmup/src/test/java/org/apachi/onami/lifecycle/warmup/TestWarmUpManager.java Mon Apr 1 08:07:14 2013 @@ -26,6 +26,7 @@ import static org.junit.Assert.assertTru import java.util.Arrays; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; @@ -73,14 +74,22 @@ public class TestWarmUpManager } }; injector.getInstance( LifeCycleStageModule.key( WarmUp.class ) ).stage( stageHandler ); - assertEquals( errorCount.get(), 1 ); + assertEquals( 1, errorCount.get() ); } @Test public void testDag1() throws Exception { - Injector injector = Guice.createInjector( WarmUpModule.newWarmUpModule() ); + Module module = new AbstractModule() + { + @Override + protected void configure() + { + bind( CountDownLatch.class ).toInstance( new CountDownLatch( 3 ) ); + } + }; + Injector injector = Guice.createInjector( WarmUpModule.newWarmUpModule(), module ); injector.getInstance( Dag1.A.class ); injector.getInstance( LifeCycleStageModule.key( WarmUp.class ) ).stage(); Recorder recorder = injector.getInstance( Recorder.class ); @@ -92,7 +101,7 @@ public class TestWarmUpManager assertNotConcurrent( recorder, "A", "B" ); assertNotConcurrent( recorder, "A", "C" ); - assertEquals( recorder.getInterruptions().size(), 0 ); + assertEquals( 0, recorder.getInterruptions().size() ); assertOrdering( recorder, "A", "B" ); assertOrdering( recorder, "A", "C" ); } @@ -126,7 +135,7 @@ public class TestWarmUpManager assertNotConcurrent( recorder, "B3", "C3" ); assertNotConcurrent( recorder, "B4", "C3" ); - assertEquals( recorder.getInterruptions().size(), 0 ); + assertEquals( 0, recorder.getInterruptions().size() ); assertOrdering( recorder, "A1", "B1" ); assertOrdering( recorder, "B1", "C1" ); assertOrdering( recorder, "A1", "B2" ); @@ -160,7 +169,7 @@ public class TestWarmUpManager assertNotConcurrent( recorder, "A", "B" ); assertNotConcurrent( recorder, "A", "C" ); - assertEquals( recorder.getInterruptions().size(), 0 ); + assertEquals( 0, recorder.getInterruptions().size() ); assertOrdering( recorder, "A", "C" ); assertOrdering( recorder, "C", "D" ); assertOrdering( recorder, "A", "D" ); @@ -191,7 +200,7 @@ public class TestWarmUpManager System.out.println( recorder.getConcurrents() ); assertSingleExecution( recorder ); - assertEquals( recorder.getInterruptions().size(), 0 ); + assertEquals( 0, recorder.getInterruptions().size() ); assertOrdering( recorder, "D", "E" ); assertOrdering( recorder, "C", "E" ); assertOrdering( recorder, "B", "D" ); @@ -212,7 +221,7 @@ public class TestWarmUpManager System.out.println( recorder.getConcurrents() ); assertSingleExecution( recorder ); - assertEquals( recorder.getInterruptions().size(), 0 ); + assertEquals( 0, recorder.getInterruptions().size() ); assertTrue( recorder.getRecordings().indexOf( "A" ) >= 0 ); assertTrue( recorder.getRecordings().indexOf( "B" ) >= 0 ); } @@ -221,6 +230,7 @@ public class TestWarmUpManager public void testStuck() throws Exception { + final CountDownLatch latch = new CountDownLatch( 2 ); Module module = new AbstractModule() { @Override @@ -229,6 +239,7 @@ public class TestWarmUpManager RecorderSleepSettings recorderSleepSettings = new RecorderSleepSettings(); recorderSleepSettings.setBaseSleepFor( "C", 1, TimeUnit.DAYS ); bind( RecorderSleepSettings.class ).toInstance( recorderSleepSettings ); + bind( CountDownLatch.class ).toInstance( latch ); } }; LifeCycleStageModule<WarmUp> warmUpModule = WarmUpModule.builder().withMaxWait( 1, TimeUnit.SECONDS ).build(); @@ -248,11 +259,8 @@ public class TestWarmUpManager assertTrue( e.getCause() instanceof TimeoutException ); } - // Wait for all interrupted warmup tasks to finish - // and add themselfs to recorder. - // This fixes race between test thread and interrupted tasks - // threads. This workaround is good enough for test. - Thread.sleep( 1000 ); + // Wait for all warmup methods to finish after interruption + assertTrue( latch.await( 1, TimeUnit.MINUTES ) ); Recorder recorder = injector.getInstance( Recorder.class ); @@ -262,7 +270,9 @@ public class TestWarmUpManager assertSingleExecution( recorder ); assertFalse( succeeded ); assertTrue( recorder.getRecordings().contains( "B" ) ); - assertEquals( recorder.getInterruptions(), Arrays.asList( "C" ) ); + // What is interrupted depends on warmup order + assertTrue( Arrays.asList( "C" ).equals( recorder.getInterruptions() ) || + Arrays.asList( "C", "B" ).equals( recorder.getInterruptions() ) ); } private void assertSingleExecution( Recorder recorder )