Repository: brooklyn-server Updated Branches: refs/heads/0.12.0 2e348971b -> 047128103
Fix PeriodicEffectorPolicy Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/04712810 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/04712810 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/04712810 Branch: refs/heads/0.12.0 Commit: 0471281036d8f55810a6a462ecbbeb75d2f27654 Parents: 2e34897 Author: Aled Sage <[email protected]> Authored: Tue Sep 19 23:16:07 2017 +0100 Committer: Duncan Godwin <[email protected]> Committed: Wed Sep 20 12:02:21 2017 +0100 ---------------------------------------------------------------------- .../core/test/entity/TestEntityImpl.java | 2 +- .../action/AbstractScheduledEffectorPolicy.java | 88 ++++++++++++------ .../policy/action/PeriodicEffectorPolicy.java | 30 ++++--- .../policy/action/ScheduledEffectorPolicy.java | 7 +- .../action/AbstractEffectorPolicyTest.java | 81 +++++++++++++++++ .../action/PeriodicEffectorPolicyTest.java | 79 +++++++++++------ .../action/ScheduledEffectorPolicyTest.java | 65 ++++++++------ .../action/ScheduledPolicyRebindTest.java | 93 +++++++++++--------- 8 files changed, 305 insertions(+), 140 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/04712810/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java b/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java index a2a64b9..065b432 100644 --- a/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java +++ b/core/src/test/java/org/apache/brooklyn/core/test/entity/TestEntityImpl.java @@ -219,7 +219,7 @@ public class TestEntityImpl extends AbstractEntity implements TestEntity { @Override public void clearCallHistory() { - callHistory.clear();; + callHistory.clear(); } public static class TestEntityWithoutEnrichers extends TestEntityImpl { http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/04712810/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java index f4635bc..b61e012 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/AbstractScheduledEffectorPolicy.java @@ -40,6 +40,7 @@ import org.apache.brooklyn.core.config.ConfigKeys; import org.apache.brooklyn.core.entity.EntityInitializers; import org.apache.brooklyn.core.entity.trait.Startable; import org.apache.brooklyn.core.policy.AbstractPolicy; +import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.core.config.ConfigBag; import org.apache.brooklyn.util.core.config.ResolvingConfigBag; import org.apache.brooklyn.util.core.task.Tasks; @@ -53,9 +54,9 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.Beta; import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; import com.google.common.reflect.TypeToken; @Beta @@ -101,7 +102,7 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp public static final ConfigKey<Boolean> RUNNING = ConfigKeys.builder(Boolean.class) .name("running") - .description("Set if the executor has started") + .description("[INTERNAL] Set if the executor has started") .defaultValue(Boolean.FALSE) .reconfigurable(true) .build(); @@ -109,7 +110,7 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp public static final ConfigKey<List<Long>> SCHEDULED = ConfigKeys.builder(new TypeToken<List<Long>>() { }) .name("scheduled") .description("List of all scheduled execution start times") - .defaultValue(Lists.newCopyOnWriteArrayList()) + .defaultValue(ImmutableList.of()) .reconfigurable(true) .build(); @@ -126,6 +127,12 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp setup(); } + @Override + public void rebind() { + // Called before setEntity; therefore don't do any real work here that might cause us to reference the entity + setup(); + } + public void setup() { if (executor != null) { executor.shutdownNow(); @@ -140,29 +147,39 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp effector = getEffector(); + if (Boolean.TRUE.equals(config().get(RUNNING))) { + running.set(true); + resubmitOnResume(); + } + AttributeSensor<Boolean> sensor = config().get(START_SENSOR); - subscriptions().subscribe(entity, sensor, this); + subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, sensor, this); } @Override - public void rebind() { - setup(); - - if (config().get(RUNNING)) { - running.set(true); - - List<Long> scheduled = config().get(SCHEDULED); - for (Long when : scheduled) { - Duration wait = Duration.millis(when - System.currentTimeMillis()); - if (wait.isPositive()) { - schedule(wait); - } else { - scheduled.remove(when); - } + public void resume() { + super.resume(); + + if (running.get()) { + resubmitOnResume(); + } + } + + protected List<Long> resubmitOnResume() { + List<Long> scheduled = config().get(SCHEDULED); + List<Long> updatedScheduled = MutableList.copyOf(scheduled); + for (Long when : scheduled) { + Duration wait = Duration.millis(when - System.currentTimeMillis()); + if (wait.isPositive()) { + scheduleInExecutor(wait); + } else { + updatedScheduled.remove(when); } } + config().set(SCHEDULED, updatedScheduled); + return updatedScheduled; } - + @Override protected <T> void doReconfigureConfig(ConfigKey<T> key, T val) { if (key.isReconfigurable()) { @@ -178,13 +195,14 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp super.destroy(); } + public abstract void start(); protected Effector<?> getEffector() { String effectorName = config().get(EFFECTOR); Maybe<Effector<?>> effector = getEntity().getEntityType().getEffectorByName(effectorName); if (effector.isAbsentOrNull()) { - throw new IllegalStateException("Cannot find effector " + effectorName); + throw new IllegalStateException("Cannot find effector " + effectorName + " on entity " + getEntity()); } return effector.get(); } @@ -196,8 +214,7 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp try { Calendar now = Calendar.getInstance(); Calendar when = Calendar.getInstance(); - boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion - Date parsed = formatted ? FORMATTER.parse(time) : new Date(Long.parseLong(time) * 1000); + Date parsed = parseTime(time); when.setTime(parsed); when.set(now.get(Calendar.YEAR), now.get(Calendar.MONTH), now.get(Calendar.DATE)); if (when.before(now)) { @@ -210,16 +227,35 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp } } + protected Date parseTime(String time) throws ParseException { + boolean formatted = time.contains(":"); // FIXME deprecated TimeDuration coercion + if (formatted) { + synchronized (FORMATTER) { + // DateFormat is not thread-safe; docs say to use one-per-thread, or to synchronize externally + return FORMATTER.parse(time); + } + } else { + return new Date(Long.parseLong(time) * 1000); + } + } + protected void schedule(Duration wait) { - List<Long> scheduled = config().get(SCHEDULED); + List<Long> scheduled = MutableList.copyOf(config().get(SCHEDULED)); scheduled.add(System.currentTimeMillis() + wait.toMilliseconds()); + config().set(SCHEDULED, scheduled); + scheduleInExecutor(wait); + } + + private void scheduleInExecutor(Duration wait) { executor.schedule(this, wait.toMilliseconds(), TimeUnit.MILLISECONDS); } @Override public synchronized void run() { if (effector == null) return; + if (!(isRunning() && getManagementContext().isRunning())) return; + try { ConfigBag bag = ResolvingConfigBag.newInstanceExtending(getManagementContext(), config().getBag()); Map<String, Object> args = EntityInitializers.resolve(bag, EFFECTOR_ARGUMENTS); @@ -233,8 +269,8 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp Object result = entity.invoke(effector, resolved).getUnchecked(); LOG.debug("{}: Effector {} returned {}", new Object[] { this, effector.getName(), result }); } catch (RuntimeInterruptedException rie) { - Thread.interrupted(); - // TODO sometimes this seems to hang the executor? + // Gracefully stop + Thread.currentThread().interrupt(); } catch (Throwable t) { LOG.warn("{}: Exception running {}: {}", new Object[] { this, effector.getName(), t.getMessage() }); Exceptions.propagate(t); @@ -246,7 +282,7 @@ public abstract class AbstractScheduledEffectorPolicy extends AbstractPolicy imp LOG.debug("{}: Got event {}", this, event); AttributeSensor<Boolean> sensor = config().get(START_SENSOR); if (event.getSensor().getName().equals(sensor.getName())) { - Boolean start = (Boolean) event.getValue(); + Boolean start = Boolean.TRUE.equals(event.getValue()); if (start && running.compareAndSet(false, true)) { config().set(RUNNING, true); start(); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/04712810/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java index 0c2de9f..dafc69c 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicy.java @@ -86,28 +86,30 @@ public class PeriodicEffectorPolicy extends AbstractScheduledEffectorPolicy { } @Override - public void rebind() { - super.rebind(); - - // Check if we missed an entire period - List<Long> scheduled = config().get(SCHEDULED); - if (running.get() && scheduled.isEmpty()) { + protected List<Long> resubmitOnResume() { + List<Long> scheduled = super.resubmitOnResume(); + + if (scheduled.isEmpty()) { + // We missed an entire period; re-calculate (rather than relying on run's finally block) start(); } + return scheduled; } - + @Override public synchronized void run() { try { super.run(); } finally { - Duration period = config().get(PERIOD); - String time = config().get(TIME); - if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) { - schedule(period); - } else { - Duration wait = getWaitUntil(time); - schedule(wait.upperBound(period)); + if (isRunning() && getManagementContext().isRunning()) { + Duration period = config().get(PERIOD); + String time = config().get(TIME); + if (time == null || time.equalsIgnoreCase(NOW) || time.equalsIgnoreCase(IMMEDIATELY)) { + schedule(period); + } else { + Duration wait = getWaitUntil(time); + schedule(wait.upperBound(period)); + } } } } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/04712810/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java index 5a3bed7..a2a5818 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicy.java @@ -31,6 +31,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.annotations.Beta; +import com.google.common.collect.ImmutableMap; /** * A {@link Policy} the executes an {@link Effector} at a specific time in the future. @@ -59,8 +60,8 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy { public void setEntity(final EntityLocal entity) { super.setEntity(entity); - subscriptions().subscribe(entity, INVOKE_IMMEDIATELY, this); - subscriptions().subscribe(entity, INVOKE_AT, this); + subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, INVOKE_IMMEDIATELY, this); + subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, INVOKE_AT, this); } @Override @@ -89,7 +90,7 @@ public class ScheduledEffectorPolicy extends AbstractScheduledEffectorPolicy { } } if (event.getSensor().getName().equals(INVOKE_IMMEDIATELY.getName())) { - Boolean invoke = (Boolean) event.getValue(); + Boolean invoke = Boolean.TRUE.equals(event.getValue()); if (invoke) { schedule(Duration.ZERO); } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/04712810/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java new file mode 100644 index 0000000..dd0fed3 --- /dev/null +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/AbstractEffectorPolicyTest.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.policy.action; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertTrue; + +import java.util.List; + +import org.apache.brooklyn.api.objs.Configurable; +import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.sensor.Sensors; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.test.Asserts; +import org.apache.brooklyn.util.time.Duration; + +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +public class AbstractEffectorPolicyTest extends BrooklynAppUnitTestSupport { + + protected static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start"); + + protected <T> void assertConfigEqualsEventually(Configurable obj, ConfigKey<T> running, T val) { + Asserts.eventually(() -> obj.config().get(running), Predicates.equalTo(val)); + } + + protected void assertCallHistoryNeverContinually(TestEntity entity, String effector) { + Asserts.continually(() -> entity.getCallHistory(), l -> !l.contains(effector)); + } + + protected void assertCallHistoryContainsEventually(TestEntity entity, String effector) { + assertCallHistoryEventually(entity, effector, 1); + } + + protected void assertCallHistoryEventually(TestEntity entity, String effector, int minSize) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + int size = getCallHistoryCount(entity, effector); + assertTrue(size >= minSize, "size="+size); + }}); + } + + protected void assertCallsStopEventually(TestEntity entity, String effector) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + int size1 = getCallHistoryCount(entity, effector); + Asserts.succeedsContinually(ImmutableMap.of("timeout", Duration.millis(100)), new Runnable() { + public void run() { + int size2 = getCallHistoryCount(entity, effector); + assertEquals(size1, size2); + }}); + }}); + } + + protected int getCallHistoryCount(TestEntity entity, String effector) { + List<String> callHistory = entity.getCallHistory(); + synchronized (callHistory) { + return Iterables.size(Iterables.filter(callHistory, Predicates.equalTo("myEffector"))); + } + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/04712810/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java index 0268d60..a2aa4a6 100644 --- a/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/PeriodicEffectorPolicyTest.java @@ -22,22 +22,17 @@ package org.apache.brooklyn.policy.action; import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.policy.Policy; import org.apache.brooklyn.api.policy.PolicySpec; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.core.sensor.Sensors; -import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.test.Asserts; -import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; import org.testng.annotations.Test; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport { - - private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start"); +public class PeriodicEffectorPolicyTest extends AbstractEffectorPolicyTest { @Test public void testPeriodicEffectorFires() { @@ -55,20 +50,19 @@ public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport { Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING)); entity.sensors().set(START, Boolean.TRUE); - Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); - Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); - int calls = entity.getCallHistory().size(); - Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500)); + assertConfigEqualsEventually(policy, PeriodicEffectorPolicy.RUNNING, true); + assertCallHistoryEventually(entity, "myEffector", 2); } - @Test + // Integration because of long wait + @Test(groups="Integration") public void testPeriodicEffectorFiresAfterDelay() { TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) .policy(PolicySpec.create(PeriodicEffectorPolicy.class) .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) - .configure(PeriodicEffectorPolicy.WAIT, Duration.TEN_SECONDS) + .configure(PeriodicEffectorPolicy.WAIT, Duration.FIVE_SECONDS) .configure(PeriodicEffectorPolicy.START_SENSOR, START))); Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); Asserts.assertNotNull(policy); @@ -77,20 +71,53 @@ public class PeriodicEffectorPolicyTest extends BrooklynAppUnitTestSupport { Asserts.assertFalse(policy.config().get(PeriodicEffectorPolicy.RUNNING)); entity.sensors().set(START, Boolean.TRUE); - Asserts.eventually(() -> policy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); - sleep(Duration.seconds(5)); - Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); - sleep(Duration.seconds(5)); - Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); - int calls = entity.getCallHistory().size(); - Asserts.eventually(() -> entity.getCallHistory().size(), i -> i > (calls + 500)); + assertConfigEqualsEventually(policy, PeriodicEffectorPolicy.RUNNING, true); + assertCallHistoryNeverContinually(entity, "myEffector"); + + Time.sleep(Duration.seconds(5)); + assertCallHistoryEventually(entity, "myEffector", 2); } - private void sleep(Duration duration) { - try { - Thread.sleep(duration.toMilliseconds()); - } catch (InterruptedException ie) { - Exceptions.propagate(ie); - } + @Test + public void testSuspendsAndResumes() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(PeriodicEffectorPolicy.class) + .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") + .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) + .configure(PeriodicEffectorPolicy.TIME, "immediately") + .configure(PeriodicEffectorPolicy.START_SENSOR, START))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + entity.sensors().set(START, Boolean.TRUE); + assertCallHistoryContainsEventually(entity, "myEffector"); + + policy.suspend(); + assertCallsStopEventually(entity, "myEffector"); + entity.clearCallHistory(); + + policy.resume(); + assertCallHistoryContainsEventually(entity, "myEffector"); + } + + @Test + public void testSuspendsAndResumeBeforeTriggered() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(PeriodicEffectorPolicy.class) + .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") + .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(PeriodicEffectorPolicy.PERIOD, Duration.ONE_MILLISECOND) + .configure(PeriodicEffectorPolicy.TIME, "immediately") + .configure(PeriodicEffectorPolicy.START_SENSOR, START))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + policy.suspend(); + policy.resume(); + assertCallHistoryNeverContinually(entity, "myEffector"); + + entity.sensors().set(START, Boolean.TRUE); + assertCallHistoryContainsEventually(entity, "myEffector"); } } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/04712810/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java index 5271de3..e6947b2 100644 --- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledEffectorPolicyTest.java @@ -22,22 +22,17 @@ package org.apache.brooklyn.policy.action; import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.policy.Policy; import org.apache.brooklyn.api.policy.PolicySpec; -import org.apache.brooklyn.api.sensor.AttributeSensor; -import org.apache.brooklyn.core.sensor.Sensors; -import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.test.Asserts; -import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.time.Duration; +import org.apache.brooklyn.util.time.Time; import org.testng.annotations.Test; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; -public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport { - - private static final AttributeSensor<Boolean> START = Sensors.newBooleanSensor("start"); +public class ScheduledEffectorPolicyTest extends AbstractEffectorPolicyTest { @Test public void testScheduledEffectorFiresImmediately() { @@ -54,17 +49,18 @@ public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport { Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); entity.sensors().set(START, Boolean.TRUE); - Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); - Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true); + assertCallHistoryContainsEventually(entity, "myEffector"); } - @Test + // Integration because of long wait + @Test(groups="Integration") public void testScheduledEffectorFiresAfterDelay() { TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) .policy(PolicySpec.create(ScheduledEffectorPolicy.class) .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) - .configure(ScheduledEffectorPolicy.WAIT, Duration.TEN_SECONDS) + .configure(ScheduledEffectorPolicy.WAIT, Duration.FIVE_SECONDS) .configure(ScheduledEffectorPolicy.START_SENSOR, START))); Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); Asserts.assertNotNull(policy); @@ -73,11 +69,33 @@ public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport { Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); entity.sensors().set(START, Boolean.TRUE); - Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); - sleep(Duration.seconds(5)); - Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); - sleep(Duration.seconds(5)); - Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); + assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true); + assertCallHistoryNeverContinually(entity, "myEffector"); + + Time.sleep(Duration.seconds(5)); + assertCallHistoryContainsEventually(entity, "myEffector"); + } + + // Integration because of long wait + @Test(groups="Integration") + public void testSuspendsAndResumes() { + TestEntity entity = app.createAndManageChild(EntitySpec.create(TestEntity.class) + .policy(PolicySpec.create(ScheduledEffectorPolicy.class) + .configure(ScheduledEffectorPolicy.EFFECTOR, "myEffector") + .configure(ScheduledEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) + .configure(ScheduledEffectorPolicy.WAIT, Duration.FIVE_SECONDS) + .configure(ScheduledEffectorPolicy.START_SENSOR, START))); + Policy policy = Iterables.tryFind(entity.policies(), Predicates.instanceOf(ScheduledEffectorPolicy.class)).orNull(); + Asserts.assertNotNull(policy); + + entity.sensors().set(START, Boolean.TRUE); + assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true); + + policy.suspend(); + policy.resume(); + + Time.sleep(Duration.seconds(5)); + assertCallHistoryContainsEventually(entity, "myEffector"); } @Test @@ -94,19 +112,10 @@ public class ScheduledEffectorPolicyTest extends BrooklynAppUnitTestSupport { Asserts.assertFalse(policy.config().get(ScheduledEffectorPolicy.RUNNING)); entity.sensors().set(START, Boolean.TRUE); - Asserts.eventually(() -> policy.config().get(ScheduledEffectorPolicy.RUNNING), b -> b); - sleep(Duration.seconds(5)); - Asserts.eventually(() -> entity.getCallHistory(), l -> !l.contains("myEffector")); + assertConfigEqualsEventually(policy, ScheduledEffectorPolicy.RUNNING, true); + assertCallHistoryNeverContinually(entity, "myEffector"); entity.sensors().set(ScheduledEffectorPolicy.INVOKE_IMMEDIATELY, Boolean.TRUE); - Asserts.eventually(() -> entity.getCallHistory(), l -> l.contains("myEffector")); - } - - private void sleep(Duration duration) { - try { - Thread.sleep(duration.toMilliseconds()); - } catch (InterruptedException ie) { - Exceptions.propagate(ie); - } + assertCallHistoryContainsEventually(entity, "myEffector"); } } http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/04712810/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java ---------------------------------------------------------------------- diff --git a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java index 825ac31..2fdc80c 100644 --- a/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java +++ b/policy/src/test/java/org/apache/brooklyn/policy/action/ScheduledPolicyRebindTest.java @@ -18,12 +18,18 @@ */ package org.apache.brooklyn.policy.action; +import static org.testng.Assert.assertTrue; + +import java.util.List; + import org.apache.brooklyn.api.entity.EntitySpec; +import org.apache.brooklyn.api.objs.Configurable; import org.apache.brooklyn.api.policy.Policy; import org.apache.brooklyn.api.policy.PolicySpec; import org.apache.brooklyn.api.sensor.AttributeSensor; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.mgmt.rebind.RebindOptions; import org.apache.brooklyn.core.mgmt.rebind.RebindTestFixtureWithApp; -import org.apache.brooklyn.core.policy.AbstractPolicy; import org.apache.brooklyn.core.sensor.Sensors; import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.test.Asserts; @@ -53,20 +59,16 @@ public class ScheduledPolicyRebindTest extends RebindTestFixtureWithApp { .configure(PeriodicEffectorPolicy.START_SENSOR, START))); origEntity.sensors().set(START, Boolean.TRUE); - Asserts.eventually(() -> origEntity.getCallHistory(), l -> l.contains("myEffector")); - - Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); - Asserts.assertNotNull(origPolicy); - newApp = rebind(); - ((AbstractPolicy) origPolicy).destroy(); - TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull(); - Asserts.assertNotNull(newEntity); - Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); - Asserts.assertNotNull(newPolicy); - - Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); + assertCallHistoryContainsEventually(origEntity, "myEffector"); + + newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true)); + + TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)); + Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)); + + assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true); int calls = newEntity.getCallHistory().size(); - Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 500)); + assertCallHistoryEventually(newEntity, "myEffector", calls + 2); } @Test @@ -75,25 +77,20 @@ public class ScheduledPolicyRebindTest extends RebindTestFixtureWithApp { .policy(PolicySpec.create(PeriodicEffectorPolicy.class) .configure(PeriodicEffectorPolicy.EFFECTOR, "myEffector") .configure(PeriodicEffectorPolicy.EFFECTOR_ARGUMENTS, ImmutableMap.of()) - .configure(PeriodicEffectorPolicy.PERIOD, Duration.seconds(1)) + .configure(PeriodicEffectorPolicy.PERIOD, Duration.millis(100)) .configure(PeriodicEffectorPolicy.TIME, "immediately") .configure(PeriodicEffectorPolicy.START_SENSOR, START))); origEntity.sensors().set(START, Boolean.TRUE); - Asserts.eventually(() -> origEntity.getCallHistory(), l -> l.contains("myEffector")); - - Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); - Asserts.assertNotNull(origPolicy); - newApp = rebind(); - ((AbstractPolicy) origPolicy).destroy(); - TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull(); - Asserts.assertNotNull(newEntity); - Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); - Asserts.assertNotNull(newPolicy); - - Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); - int calls = newEntity.getCallHistory().size(); - Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 5)); + assertCallHistoryContainsEventually(origEntity, "myEffector"); + + newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true)); + + TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)); + Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)); + + assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true); + assertCallHistoryContainsEventually(newEntity, "myEffector"); } @Test @@ -106,23 +103,35 @@ public class ScheduledPolicyRebindTest extends RebindTestFixtureWithApp { .configure(PeriodicEffectorPolicy.TIME, "immediately") .configure(PeriodicEffectorPolicy.START_SENSOR, START))); - Policy origPolicy = Iterables.tryFind(origEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); - Asserts.assertNotNull(origPolicy); - newApp = rebind(); - ((AbstractPolicy) origPolicy).destroy(); - TestEntity newEntity = (TestEntity) Iterables.tryFind(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)).orNull(); - Asserts.assertNotNull(newEntity); - Policy newPolicy = Iterables.tryFind(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)).orNull(); - Asserts.assertNotNull(newPolicy); + newApp = rebind(RebindOptions.create().terminateOrigManagementContext(true)); + + TestEntity newEntity = (TestEntity) Iterables.find(newApp.getChildren(), Predicates.instanceOf(TestEntity.class)); + Policy newPolicy = Iterables.find(newEntity.policies(), Predicates.instanceOf(PeriodicEffectorPolicy.class)); Asserts.assertFalse(newPolicy.config().get(PeriodicEffectorPolicy.RUNNING)); Asserts.assertFalse(newEntity.getCallHistory().contains("myEffector")); - Asserts.assertFalse(origEntity.getCallHistory().contains("myEffector")); newEntity.sensors().set(START, Boolean.TRUE); - Asserts.eventually(() -> newPolicy.config().get(PeriodicEffectorPolicy.RUNNING), b -> b); - Asserts.eventually(() -> newEntity.getCallHistory(), l -> l.contains("myEffector")); - int calls = newEntity.getCallHistory().size(); - Asserts.eventually(() -> newEntity.getCallHistory().size(), i -> i > (calls + 500)); + assertConfigEqualsEventually(newPolicy, PeriodicEffectorPolicy.RUNNING, true); + assertCallHistoryEventually(newEntity, "myEffector", 2); + } + + private <T> void assertConfigEqualsEventually(Configurable obj, ConfigKey<T> running, T val) { + Asserts.eventually(() -> obj.config().get(running), Predicates.equalTo(val)); + } + + private void assertCallHistoryContainsEventually(TestEntity entity, String effector) { + assertCallHistoryEventually(entity, effector, 1); + } + + private void assertCallHistoryEventually(TestEntity entity, String effector, int minSize) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + List<String> callHistory = entity.getCallHistory(); + synchronized (callHistory) { + int size = Iterables.size(Iterables.filter(callHistory, Predicates.equalTo("myEffector"))); + assertTrue(size >= minSize, "size="+size); + } + }}); } }
