This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push:
new fe283dbe79 Add more dependent configuration wait modes
fe283dbe79 is described below
commit fe283dbe79ce641388d9f5271c944cb1390432a7
Author: Alex Heneveld <[email protected]>
AuthorDate: Mon Oct 6 14:33:49 2025 +0100
Add more dependent configuration wait modes
So on_fire does not break concurrently started items
---
.../brooklyn/camp/brooklyn/ConfigYamlTest.java | 66 ++--
.../core/sensor/DependentConfiguration.java | 347 +++++++++++----------
.../core/entity/DependentConfigurationTest.java | 87 ++++--
3 files changed, 281 insertions(+), 219 deletions(-)
diff --git
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
index 99ac2228fd..6cb5e5aa8e 100644
---
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
+++
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
@@ -33,10 +33,12 @@ import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityAsserts;
import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
+import org.apache.brooklyn.core.sensor.DependentConfiguration;
import org.apache.brooklyn.core.sensor.Sensors;
import org.apache.brooklyn.core.server.BrooklynServerConfig;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
+import org.apache.brooklyn.util.core.flags.TypeCoercions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
@@ -57,7 +59,7 @@ import java.util.concurrent.Executors;
import static org.testng.Assert.*;
public class ConfigYamlTest extends AbstractYamlTest {
-
+
private static final Logger LOG =
LoggerFactory.getLogger(ConfigYamlTest.class);
final static String DOUBLE_MAX_VALUE_TIMES_TEN = "" + Double.MAX_VALUE +
"0";
@@ -68,7 +70,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
@Override
public void setUp() throws Exception {
super.setUp();
-
+
executor = Executors.newCachedThreadPool();
}
@@ -106,18 +108,18 @@ public class ConfigYamlTest extends AbstractYamlTest {
assertNull(entity.getMyField()); // field with @SetFromFlag
assertNull(entity.getMyField2()); // field with
@SetFromFlag("myField2Alias"), set using alias
}
-
+
@Test
public void testRecursiveConfigFailsGracefully() throws Exception {
doTestRecursiveConfigFailsGracefully(false);
}
-
+
@Test
public void testRecursiveConfigImmediateFailsGracefully() throws Exception
{
doTestRecursiveConfigFailsGracefully(true);
}
-
+
protected void doTestRecursiveConfigFailsGracefully(boolean immediate)
throws Exception {
String yaml = Joiner.on("\n").join(
"services:",
@@ -178,18 +180,18 @@ public class ConfigYamlTest extends AbstractYamlTest {
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity)
Iterables.getOnlyElement(app.getChildren());
-
+
assertEquals(entity.config().get(TestEntity.CONF_NAME), "myName"); //
confName has @SetFromFlag("confName"); using full name
assertEquals(entity.config().get(TestEntity.CONF_OBJECT), "myObj"); //
confObject does not have @SetFromFlag
assertEquals(entity.config().get(TestEntity.CONF_STRING), "myString");
// set using the @SetFromFlag alias
-
- // The "dynamic" config key (i.e. not defined on the entity's type) is
not picked up to
+
+ // The "dynamic" config key (i.e. not defined on the entity's type) is
not picked up to
// be set on the entity if it's not inside the "brooklyn.config"
block. This isn't exactly
// desired behaviour, but it is what happens! This test is more to
demonstrate the behaviour
- // than to say it is definitely what we want! But like the comment at
the start of the
+ // than to say it is definitely what we want! But like the comment at
the start of the
// method says, this style is discouraged so we don't really care.
assertNull(entity.config().get(ConfigKeys.newStringConfigKey("test.confDynamic")));
// not defined on entity
-
+
// Again this isn't exactly desired behaviour, just a demonstration of
what happens!
// The names used in YAML correspond to fields with @SetFromFlag. The
values end up in the
// {@link EntitySpec#config} rather than {@link EntitySpec#flags}. The
field is not set.
@@ -212,13 +214,13 @@ public class ConfigYamlTest extends AbstractYamlTest {
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity)
Iterables.getOnlyElement(app.getChildren());
-
+
// Task that resolves quickly
assertEquals(entity.config().get(TestEntity.CONF_MAP_PLAIN),
ImmutableMap.of("mykey", "myval"));
assertEquals(entity.config().get(TestEntity.CONF_LIST_PLAIN),
ImmutableList.of("myval"));
assertEquals(entity.config().get(TestEntity.CONF_SET_PLAIN),
ImmutableSet.of("myval"));
}
-
+
/**
* This tests config keys of type {@link
org.apache.brooklyn.core.config.MapConfigKey}, etc.
* It sets the value all in one go (as opposed to explicit sub-keys).
@@ -244,7 +246,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity)
Iterables.getOnlyElement(app.getChildren());
-
+
// Task that resolves quickly
assertEquals(entity.config().get(TestEntity.CONF_MAP_THING),
ImmutableMap.of("mykey", "myval"));
assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING),
ImmutableMap.of("mykey", "myval"));
@@ -253,7 +255,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
assertEquals(entity.config().get(TestEntity.CONF_SET_THING),
ImmutableSet.of("myval"));
assertEquals(entity.config().get(TestEntity.CONF_SET_OBJ_THING),
ImmutableSet.of("myval"));
}
-
+
/**
* This tests config keys of type {@link
org.apache.brooklyn.core.config.MapConfigKey}, etc.
* It sets the value of each sub-key explicitly, rather than all in one go.
@@ -273,7 +275,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity)
Iterables.getOnlyElement(app.getChildren());
-
+
// Task that resolves quickly
assertEquals(entity.config().get(TestEntity.CONF_MAP_THING),
ImmutableMap.of("mykey", "myval"));
assertEquals(entity.config().get(TestEntity.CONF_MAP_OBJ_THING),
ImmutableMap.of("mykey", "myval"));
@@ -282,7 +284,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
assertEquals(entity.config().get(TestEntity.CONF_SET_THING),
ImmutableSet.of("myval"));
assertEquals(entity.config().get(TestEntity.CONF_SET_OBJ_THING),
ImmutableSet.of("myval"));
}
-
+
@Test
public void testDeferredSupplierToConfig() throws Exception {
String yaml = Joiner.on("\n").join(
@@ -307,7 +309,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
final Entity app = createStartWaitAndLogApplication(yaml);
TestEntity entity = (TestEntity)
Iterables.getOnlyElement(app.getChildren());
-
+
assertEquals(entity.config().get(TestEntity.CONF_NAME), "myOther");
assertEquals(entity.config().get(TestEntity.CONF_MAP_THING),
ImmutableMap.of("mykey", "myOther"));
assertEquals(entity.config().get(TestEntity.CONF_LIST_THING),
ImmutableList.of("myOther"));
@@ -316,7 +318,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
assertEquals(entity.config().get(TestEntity.CONF_LIST_PLAIN),
ImmutableList.of("myOther"));
assertEquals(entity.config().get(TestEntity.CONF_SET_PLAIN),
ImmutableSet.of("myOther"));
}
-
+
@Test
public void testDeferredSupplierToAttributeWhenReady() throws Exception {
String yaml = Joiner.on("\n").join(
@@ -342,7 +344,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
// Non-blocking calls will now return with the value
assertEquals(entity.config().getNonBlocking(TestEntity.CONF_NAME).get(),
"myOther");
}
-
+
/**
* This tests config keys of type {@link
org.apache.brooklyn.core.config.MapConfigKey}, etc.
* For plain maps, see {@link
#testDeferredSupplierToAttributeWhenReadyInPlainCollections()}
@@ -387,10 +389,10 @@ public class ConfigYamlTest extends AbstractYamlTest {
assertEquals(entity.config().getNonBlocking(TestEntity.CONF_LIST_THING).get(),
ImmutableList.of("myOther"));
assertEquals(entity.config().getNonBlocking(TestEntity.CONF_SET_THING).get(),
ImmutableSet.of("myOther"));
}
-
+
/**
* This tests config keys of type {@link java.util.Map}, etc.
- * For special types (e.g. {@link
org.apache.brooklyn.core.config.MapConfigKey}), see
+ * For special types (e.g. {@link
org.apache.brooklyn.core.config.MapConfigKey}), see
* {@link #testDeferredSupplierToAttributeWhenReadyInPlainCollections()}.
*/
@Test
@@ -453,8 +455,8 @@ public class ConfigYamlTest extends AbstractYamlTest {
@Test
public void testAttributeWhenReadyOptionsBasicOnOtherEntity() throws
Exception {
- String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: 10ms } ] } ] }";
- String v1 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { \"timeout\": \"10ms\" } ] } ] }";
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: 10ms, timeout_if_on_fire: 0 } ]
} ] }";
+ String v1 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { \"timeout\": \"10ms\",
\"timeout_if_on_fire\": 0 } ] } ] }";
String yaml = Joiner.on("\n").join(
"services:",
@@ -484,8 +486,9 @@ public class ConfigYamlTest extends AbstractYamlTest {
sw = Stopwatch.createStarted();
Asserts.assertFailsWith(() ->
entity1.config().get(TestEntity.CONF_NAME),
Asserts.expectedFailureContainsIgnoreCase("Cannot resolve",
"$brooklyn:chain", " attributeWhenReady", "test.name", "0", "Resolving config
test.confName",
-// "Unsatisfied after ",
- "Abort due to", "on-fire"));
+// "Unsatisfied after "
+ "Abort due to"
+ , "on-fire"));
Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
// and source code
@@ -571,8 +574,8 @@ public class ConfigYamlTest extends AbstractYamlTest {
@Test
public void
testAttributeWhenReadyOptionsTimeoutIfDownResetsAndAbortsIfOnFire() throws
Exception {
- // was 10ms, but that is too short as there are 10ms sleeps while
stopping; 50ms is better
- String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: forever, timeout_if_down: 50ms
} ] } ] }";
+ // was 10ms, but that is too short as there are 10ms sleeps while
stopping
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: forever, timeout_if_down:
250ms, timeout_if_on_fire: 0 } ] } ] }";
String yaml = Joiner.on("\n").join(
"services:",
@@ -589,8 +592,9 @@ public class ConfigYamlTest extends AbstractYamlTest {
entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.STOPPING);
Stopwatch sw = Stopwatch.createStarted();
new Thread(()->{
- Time.sleep(Duration.millis(10));
+ Time.sleep(Duration.millis(10)); // 100 to force detection after
start
entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.STOPPING);
+// Time.sleep(Duration.millis(100)); // + additional delay to
force timer running
entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.RUNNING); // will clear the timeout
Time.sleep(Duration.millis(10));
@@ -675,6 +679,12 @@ public class ConfigYamlTest extends AbstractYamlTest {
Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
}
+ @Test
+ public void testAttributeWhenReadyOptionsCoercion() throws Exception {
+ DependentConfiguration.AttributeWhenReadyOptions o =
TypeCoercions.coerce(DependentConfiguration.AttributeWhenReadyOptions.allowingOnFireMap(),
DependentConfiguration.AttributeWhenReadyOptions.class);
+ Asserts.assertFalse(o.abort_if_on_fire);
+ }
+
@Test
public void testConfigGoodNumericCoercions() throws Exception {
String yaml = Joiner.on("\n").join(
diff --git
a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
index bc669ee12c..57c7ce5515 100644
---
a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
+++
b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
@@ -25,7 +25,6 @@ import com.google.common.base.Functions;
import com.google.common.base.Preconditions;
import com.google.common.base.Predicate;
import com.google.common.base.Predicates;
-import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
@@ -38,9 +37,6 @@ import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.api.mgmt.TaskFactory;
import org.apache.brooklyn.api.sensor.AttributeSensor;
-import org.apache.brooklyn.api.sensor.SensorEvent;
-import org.apache.brooklyn.api.sensor.SensorEventListener;
-import org.apache.brooklyn.config.ConfigKey;
import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
import org.apache.brooklyn.core.entity.EntityInternal;
@@ -72,12 +68,10 @@ import org.apache.brooklyn.util.guava.Maybe;
import org.apache.brooklyn.util.guava.Maybe.Absent;
import org.apache.brooklyn.util.javalang.JavaClassNames;
import org.apache.brooklyn.util.net.Urls;
-import org.apache.brooklyn.util.text.StringFunctions;
import org.apache.brooklyn.util.text.StringFunctions.RegexReplacer;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
-import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -163,24 +157,38 @@ public class DependentConfiguration {
}
public static class AttributeWhenReadyOptions {
+ public Duration timeout;
+
@JsonAlias("timeoutIfDown")
public Duration timeout_if_down;
- public Duration timeout;
+ @JsonAlias("timeoutIfDownInitial")
+ public Duration timeout_if_down_initial;
+
+ @JsonAlias("timeoutIfOnFire")
+ public Duration timeout_if_on_fire;
+ @JsonAlias("timeoutIfOnFireInitial")
+ public Duration timeout_if_on_fire_initial;
@JsonAlias("abortIfOnFire")
public boolean abort_if_on_fire = true;
@JsonAlias("waitForTruthy")
public boolean wait_for_truthy = true;
+ // we might want an additional wait_on_timeout_conditions to prevent
resolution if any of the timeout conditions are true;
+ // that could be used to prevent a value from an on-fire or down
entity being used;
+ // not generally needed though as call pattern can be to wait on
service lifecycle being explicitly RUNNING, eg via latches
+
public static AttributeWhenReadyOptions defaultOptions() {
AttributeWhenReadyOptions result = new AttributeWhenReadyOptions();
result.abort_if_on_fire = true;
+ result.timeout_if_on_fire = Duration.ZERO;
+ result.timeout_if_on_fire_initial = Duration.seconds(15); //
plenty of time for concurrently started dependencies to transition to started
result.timeout_if_down = Duration.ONE_MINUTE;
return result;
}
public static Map allowingOnFireMap() {
- return MutableMap.of("timeout", "forever");
+ return MutableMap.of("timeout", "forever", "abort_if_on_fire",
false);
}
}
@@ -227,7 +235,7 @@ public class DependentConfiguration {
*/
@Deprecated
public static <T,V> Task<V> attributePostProcessedWhenReady(final Entity
source, final AttributeSensor<T> sensor, final Predicate<? super T> ready,
final Closure<V> postProcess) {
- return attributePostProcessedWhenReady(source, sensor, ready,
GroovyJavaMethods.<T,V>functionFromClosure(postProcess));
+ return attributePostProcessedWhenReady(source, sensor, ready,
GroovyJavaMethods.functionFromClosure(postProcess));
}
@SuppressWarnings("unchecked")
@@ -245,19 +253,23 @@ public class DependentConfiguration {
return builder.build();
}
+ @Deprecated // since 1.1 use builder
public static <T> T waitInTaskForAttributeReady(Entity source,
AttributeSensor<T> sensor, Predicate<? super T> ready) {
- return waitInTaskForAttributeReady(source, sensor, ready,
ImmutableList.<AttributeAndSensorCondition<?>>of());
+ return waitInTaskForAttributeReady(source, sensor, ready,
ImmutableList.of());
}
+ @Deprecated // since 1.1 use builder
public static <T> T waitInTaskForAttributeReady(final Entity source, final
AttributeSensor<T> sensor, Predicate<? super T> ready,
List<AttributeAndSensorCondition<?>> abortConditions) {
- String blockingDetails = "Waiting for ready from "+source+" "+sensor+"
(subscription)";
- return waitInTaskForAttributeReady(source, sensor, ready,
abortConditions, blockingDetails);
+ return (T) waitInTaskForAttributeReady(source, sensor, ready, (List)
abortConditions, "Waiting for ready from "+source+" "+sensor+" (subscription)");
}
// TODO would be nice to have an easy semantics for whenServiceUp (cf
DynamicWebAppClusterImpl.whenServiceUp)
- public static <T> T waitInTaskForAttributeReady(final Entity source, final
AttributeSensor<T> sensor, Predicate<? super T> ready,
List<AttributeAndSensorCondition<?>> abortConditions, String blockingDetails) {
- return new WaitInTaskForAttributeReady<T,T>(source, sensor, ready,
abortConditions, blockingDetails).call();
+ @Deprecated // since 1.1 use builder
+ public static <T> T waitInTaskForAttributeReady(final Entity source, final
AttributeSensor<T> sensor, Predicate<? super T> ready,
List<AttributeAndSensorCondition> abortConditions, String blockingDetails) {
+ Builder<T,T> b = builder().attributeWhenReadyNoOptions(source,
sensor).readiness(ready).blockingDetails(blockingDetails).timeout(Duration.PRACTICALLY_FOREVER);
+ if (abortConditions!=null) abortConditions.forEach(c ->
b.abortIf(c.source, c.sensor, c.predicate));
+ return new WaitInTaskForAttributeReady<>(b).call();
}
protected static class WaitInTaskForAttributeReady<T,V> implements
Callable<V> {
@@ -271,8 +283,7 @@ public class DependentConfiguration {
protected final Entity source;
protected final AttributeSensor<T> sensor;
protected final Predicate<? super T> ready;
- protected final List<AttributeAndSensorCondition<?>>
abortSensorConditions;
- protected List<Pair<AttributeAndSensorCondition<Object>,Duration>>
timeoutIfTimeoutSensorConditions = null;
+ protected List<AttributeAndSensorConditionWithTimeouts<Object>>
timeoutIfTimeoutSensorConditions;
protected final String blockingDetails;
protected final Function<? super T,? extends V> postProcess;
protected final Duration timeout;
@@ -285,7 +296,6 @@ public class DependentConfiguration {
this.source = builder.source;
this.sensor = builder.sensor;
this.ready = builder.readiness;
- this.abortSensorConditions = builder.abortSensorConditions;
this.timeoutIfTimeoutSensorConditions =
builder.timeoutIfTimeoutSensorConditions;
this.blockingDetails = builder.blockingDetails;
this.postProcess = builder.postProcess;
@@ -295,22 +305,6 @@ public class DependentConfiguration {
this.onUnmanaged = builder.onUnmanaged;
}
- private WaitInTaskForAttributeReady(Entity source, AttributeSensor<T>
sensor, Predicate<? super T> ready,
- List<AttributeAndSensorCondition<?>> abortConditions, String
blockingDetails) {
- this.source = source;
- this.sensor = sensor;
- this.ready = ready;
- this.abortSensorConditions = abortConditions;
- this.blockingDetails = blockingDetails;
-
- this.timeout = Duration.PRACTICALLY_FOREVER;
- this.timeoutIfTimeoutSensorConditions = null;
- this.onTimeout = Maybe.absent();
- this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED;
- this.onUnmanaged = Maybe.absent();
- this.postProcess = null;
- }
-
@SuppressWarnings("unchecked")
protected V postProcess(T value) {
if (this.postProcess!=null) return postProcess.apply(value);
@@ -338,17 +332,27 @@ public class DependentConfiguration {
throw new RuntimeTimeoutException("Waiting not permitted");
}
- final List<Exception> abortionExceptions =
Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();
- for (AttributeAndSensorCondition abortCondition :
abortSensorConditions) {
- Object currentValue =
abortCondition.source.getAttribute(abortCondition.sensor);
- if (abortCondition.predicate.apply(currentValue)) {
- abortionExceptions.add(new Exception("Abort due to
"+abortCondition+": "+currentValue));
+ final List<Exception> abortImmediatelyExceptions =
Lists.newCopyOnWriteArrayList();
+ Map<Integer,Duration> customTimeouts = MutableMap.of();
+ if (timeoutIfTimeoutSensorConditions!=null) {
+ for (int i=0; i<timeoutIfTimeoutSensorConditions.size(); i++) {
+ AttributeAndSensorConditionWithTimeouts<Object>
timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(i);
+ if (timeoutIfCondition.timeoutInitial!=null) {
+ Object currentValue =
timeoutIfCondition.source.getAttribute(timeoutIfCondition.sensor);
+ if (timeoutIfCondition.predicate.apply(currentValue)) {
+ if
(Duration.ZERO.equals(timeoutIfCondition.timeoutInitial)) {
+ abortImmediatelyExceptions.add(new
Exception("Abort due to " + timeoutIfCondition + ": " + currentValue));
+ } else {
+ customTimeouts.put(i,
timeoutIfCondition.timeoutInitial);
+ }
+ }
+ }
}
}
- if (!abortionExceptions.isEmpty()) {
- throw new CompoundRuntimeException("Aborted waiting for ready
value from "+source+" "+sensor.getName(), abortionExceptions);
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting for ready
value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
}
TaskInternal<?> current = (TaskInternal<?>) Tasks.current();
@@ -357,83 +361,77 @@ public class DependentConfiguration {
if (entity == null) throw new IllegalStateException("Should only
be invoked in a running task with an entity tag; "+
current+" has no entity tag
("+current.getStatusDetail(false)+")");
- final LinkedList<T> publishedValues = new LinkedList<T>();
+ final LinkedList<T> publishedValues = new LinkedList<>();
final Semaphore semaphore = new Semaphore(0); // could use
Exchanger
SubscriptionHandle subscription = null;
List<SubscriptionHandle> thisWaitSubscriptions =
Lists.newArrayList();
try {
- subscription = entity.subscriptions().subscribe(source,
sensor, new SensorEventListener<T>() {
- @Override public void onEvent(SensorEvent<T> event) {
- synchronized (publishedValues) {
publishedValues.add(event.getValue()); }
- semaphore.release();
- }});
-
- for (final AttributeAndSensorCondition abortCondition :
abortSensorConditions) {
-
thisWaitSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source,
abortCondition.sensor, new SensorEventListener<Object>() {
- @Override public void onEvent(SensorEvent<Object>
event) {
- if
(abortCondition.predicate.apply(event.getValue())) {
- abortionExceptions.add(new Exception("Abort
due to "+abortCondition+": "+event.getValue()));
- semaphore.release();
- }
- }}));
- Object currentValue =
abortCondition.source.getAttribute(abortCondition.sensor);
- if (abortCondition.predicate.apply(currentValue)) {
- abortionExceptions.add(new Exception("Abort due to
"+abortCondition+": "+currentValue));
- }
- }
- if (!abortionExceptions.isEmpty()) {
- throw new CompoundRuntimeException("Aborted waiting for
ready value from "+source+" "+sensor.getName(), abortionExceptions);
- }
+ subscription = entity.subscriptions().subscribe(source,
sensor, event -> {
+ synchronized (publishedValues) {
publishedValues.add(event.getValue()); }
+ semaphore.release();
+ });
- CountdownTimer timer = timeout!=null ?
timeout.countdownTimer() : Duration.PRACTICALLY_FOREVER.countdownTimer();
+ final CountdownTimer timer = timeout!=null ?
timeout.countdownTimer() : Duration.PRACTICALLY_FOREVER.countdownTimer();
- Map<Integer,Duration> customTimeouts = MutableMap.of();
BiConsumer<Integer,Object> checkValueAtIndex = (index, val) ->
{
synchronized (customTimeouts) {
- Pair<AttributeAndSensorCondition<Object>, Duration>
timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
- if (timeoutIfCondition.getLeft().predicate.apply(val))
{
- if (!customTimeouts.containsKey(index)) {
- // start timer from this point
- customTimeouts.put(index,
timer.getDurationElapsed().add(timeoutIfCondition.getRight()));
+ AttributeAndSensorConditionWithTimeouts<Object>
timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+ if (timeoutIfCondition.predicate.apply(val)) {
+ if (timeoutIfCondition.timeout!=null) {
+ if (!customTimeouts.containsKey(index)) {
+ // start timer from this point
+ Duration customTimeout =
Duration.ZERO.equals(timeoutIfCondition.timeout) ? Duration.ZERO :
timer.getDurationElapsed().add(timeoutIfCondition.timeout);
+ if
(timeoutIfCondition.timeoutInitial!=null) customTimeout =
Duration.max(timeoutIfCondition.timeoutInitial, customTimeout);
+ if (Duration.ZERO.equals(customTimeout)) {
+ abortImmediatelyExceptions.add(new
Exception("Abort due to " + timeoutIfCondition + ": " + val));
+ } else {
+ customTimeouts.put(index,
customTimeout);
+ }
+ }
+ } else {
+ // if timeout not set, it is only enabled for
'initial'; don't do anything (only remove if condition becomes false)
}
} else {
customTimeouts.remove(index);
}
}
};
-
if (timeoutIfTimeoutSensorConditions!=null) {
for (int i=0; i<timeoutIfTimeoutSensorConditions.size();
i++) {
int index = i;
- Pair<AttributeAndSensorCondition<Object>, Duration>
timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+ AttributeAndSensorConditionWithTimeouts<Object>
timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
-
thisWaitSubscriptions.add(entity.subscriptions().subscribe(timeoutIfCondition.getLeft().source,
timeoutIfCondition.getLeft().sensor, new SensorEventListener<Object>() {
- @Override public void onEvent(SensorEvent<Object>
event) {
- checkValueAtIndex.accept(index,
event.getValue());
- }}));
+
thisWaitSubscriptions.add(entity.subscriptions().subscribe(timeoutIfCondition.source,
timeoutIfCondition.sensor, event -> {
+ checkValueAtIndex.accept(index, event.getValue());
+ semaphore.release(); // indicate that timeouts
need to be checked again
+ }));
- Object val =
timeoutIfCondition.getLeft().source.getAttribute(timeoutIfCondition.getLeft().sensor);
+ Object val =
timeoutIfCondition.source.getAttribute(timeoutIfCondition.sensor);
checkValueAtIndex.accept(index, val);
}
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting
for ready value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
+ }
}
Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT;
Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD;
- while (true) {
+ outer: while (true) {
// check the source on initial run (could be done outside
the loop)
// and also (optionally) on each iteration in case it is
more recent
value = source.getAttribute(sensor);
if (ready(value)) break;
- if (timer!=null) {
- if
(timer.getDurationRemaining().isShorterThan(nextPeriod)) {
- nextPeriod = timer.getDurationRemaining();
- }
- if (timer.isExpired()) {
- if (onTimeout.isPresent()) return onTimeout.get();
- throw new RuntimeTimeoutException("Unsatisfied
after "+Duration.sinceUtc(start));
+ if
(timer.getDurationRemaining().isShorterThan(nextPeriod)) {
+ nextPeriod = timer.getDurationRemaining();
+ }
+ if (timer.isExpired()) {
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted
waiting for ready value from "+source+" "+sensor.getName(),
abortImmediatelyExceptions);
}
+ if (onTimeout.isPresent()) return onTimeout.get();
+ throw new RuntimeTimeoutException("Unsatisfied after
"+Duration.sinceUtc(start));
}
String prevBlockingDetails =
current.setBlockingDetails(blockingDetails);
@@ -454,7 +452,13 @@ public class DependentConfiguration {
if (publishedValues.isEmpty()) break;
value = publishedValues.pop();
}
- if (ready(value)) break;
+ if (ready(value)) break outer;
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted
waiting for ready value from "+source+" "+sensor.getName(),
abortImmediatelyExceptions);
+ }
+ }
+ if (!abortImmediatelyExceptions.isEmpty()) {
+ throw new CompoundRuntimeException("Aborted waiting
for ready value from "+source+" "+sensor.getName(), abortImmediatelyExceptions);
}
// if unmanaged then ignore the other abort conditions
@@ -463,10 +467,6 @@ public class DependentConfiguration {
throw new NotManagedException(entity);
}
- if (!abortionExceptions.isEmpty()) {
- throw new CompoundRuntimeException("Aborted waiting
for ready value from "+source+" "+sensor.getName(), abortionExceptions);
- }
-
Set<Map.Entry<Integer, Duration>> timeoutsHere = null;
synchronized (customTimeouts) {
if (!customTimeouts.isEmpty()) {
@@ -477,13 +477,16 @@ public class DependentConfiguration {
for (Map.Entry<Integer, Duration> entry :
timeoutsHere) {
Integer index = entry.getKey();
Duration specialTimeout = entry.getValue();
- Pair<AttributeAndSensorCondition<Object>,
Duration> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+ AttributeAndSensorConditionWithTimeouts<Object>
timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
if
(timer.getDurationElapsed().isLongerThan(specialTimeout)) {
- Object val =
timeoutIfCondition.getLeft().source.getAttribute(timeoutIfCondition.getLeft().sensor);
- if
(timeoutIfCondition.getLeft().predicate.apply(val)) {
+ Object val =
timeoutIfCondition.source.getAttribute(timeoutIfCondition.sensor);
+ if (timeoutIfCondition.predicate.apply(val)) {
+ if (!abortImmediatelyExceptions.isEmpty())
{
+ throw new
CompoundRuntimeException("Aborted waiting for ready value from "+source+"
"+sensor.getName(), abortImmediatelyExceptions);
+ }
if (onTimeout.isPresent()) continue;
throw new
RuntimeTimeoutException("Unsatisfied after " + Duration.sinceUtc(start) + "
(tighter timeout due to " +
- timeoutIfCondition.getLeft() + ",
with value " + val + ")");
+ timeoutIfCondition + ", with value
" + val + ")");
}
}
}
@@ -513,14 +516,14 @@ public class DependentConfiguration {
*/
@Deprecated
public static <T> Task<T> whenDone(Callable<T> job) {
- return new BasicTask<T>(MutableMap.of("tag", "whenDone",
"displayName", "waiting for job"), job);
+ return new BasicTask<>(MutableMap.of("tag", "whenDone", "displayName",
"waiting for job"), job);
}
/**
* Returns a {@link Task} which waits for the result of first parameter,
then applies the function in the second
* parameter to it, returning that result.
- *
- * Particular useful in Entity configuration where config will block until
Tasks have completed,
+ * <p>
+ * Particularly useful in Entity configuration where config will block
until Tasks have completed,
* allowing for example an {@link #attributeWhenReady(Entity,
AttributeSensor, Predicate)} expression to be
* passed in the first argument then transformed by the function in the
second argument to generate
* the value that is used for the configuration
@@ -545,14 +548,12 @@ public class DependentConfiguration {
*/
@SuppressWarnings({ "rawtypes" })
public static <U,T> Task<T> transform(final Map flags, final
TaskAdaptable<U> task, final Function<U,T> transformer) {
- return new BasicTask<T>(flags, new Callable<T>() {
- @Override
- public T call() throws Exception {
- if (!task.asTask().isSubmitted()) {
-
BasicExecutionContext.getCurrentExecutionContext().submit(task);
- }
- return transformer.apply(task.asTask().get());
- }});
+ return new BasicTask<>(flags, () -> {
+ if (!task.asTask().isSubmitted()) {
+
BasicExecutionContext.getCurrentExecutionContext().submit(task);
+ }
+ return transformer.apply(task.asTask().get());
+ });
}
/** Returns a task which waits for multiple other tasks (submitting if
necessary)
@@ -600,7 +601,7 @@ public class DependentConfiguration {
}
});
}
- return transform(flags, new ParallelTask<U>(tasks), transformer);
+ return transform(flags, new ParallelTask<>(tasks), transformer);
}
@@ -629,9 +630,8 @@ public class DependentConfiguration {
}
return transformMultiple(
- MutableMap.<String,String>of("displayName", "formatting
'"+spec.toString()+"' with "+taskArgs.size()+"
task"+(taskArgs.size()!=1?"s":"")),
- new Function<List<Object>, String>() {
- @Override public String apply(List<Object> input) {
+ MutableMap.of("displayName", "formatting '"+spec.toString()+"'
with "+taskArgs.size()+" task"+(taskArgs.size()!=1?"s":"")),
+ input -> {
Iterator<?> tri = input.iterator();
Object[] vv = new Object[newArgs.length];
int i=0;
@@ -642,7 +642,7 @@ public class DependentConfiguration {
i++;
}
return String.format(vv[0].toString(),
Arrays.copyOfRange(vv, 1, vv.length));
- }},
+ },
taskArgs);
}
@@ -659,7 +659,7 @@ public class DependentConfiguration {
List<Object> resolvedArgs = Lists.newArrayList();
for (Object arg : args) {
Maybe<?> argVal = resolveImmediately(arg);
- if (argVal.isAbsent()) return Maybe.Absent.castAbsent(argVal);
+ if (argVal.isAbsent()) return Absent.castAbsent(argVal);
resolvedArgs.add(argVal.get());
}
@@ -672,7 +672,7 @@ public class DependentConfiguration {
public static Maybe<String> urlEncodeImmediately(Object arg) {
Maybe<?> resolvedArg = resolveImmediately(arg);
if (resolvedArg.isAbsent()) return Absent.castAbsent(resolvedArg);
- if (resolvedArg.isNull()) return Maybe.<String>of((String)null);
+ if (resolvedArg.isNull()) return Maybe.of((String)null);
String resolvedString = resolvedArg.get().toString();
return Maybe.of(Urls.encode(resolvedString));
@@ -690,13 +690,13 @@ public class DependentConfiguration {
else if (arg instanceof TaskFactory) taskArgs.add(
((TaskFactory<TaskAdaptable<Object>>)arg).newTask() );
return transformMultiple(
- MutableMap.<String,String>of("displayName", "url-escaping
'"+arg),
+ MutableMap.of("displayName", "url-escaping '"+arg),
new Function<List<Object>, String>() {
@Override
@Nullable
public String apply(@Nullable List<Object> input) {
Object resolvedArg;
- if (arg instanceof TaskAdaptable || arg instanceof
TaskFactory) resolvedArg = Iterables.getOnlyElement(input);
+ if (input != null && (arg instanceof TaskAdaptable ||
arg instanceof TaskFactory)) resolvedArg = Iterables.getOnlyElement(input);
else if (arg instanceof DeferredSupplier) resolvedArg
= ((DeferredSupplier<?>) arg).get();
else resolvedArg = arg;
@@ -710,17 +710,14 @@ public class DependentConfiguration {
public static Task<Object> external(ManagementContext mgmt, final Object
provider, final Object key) {
List<TaskAdaptable<Object>> argsNeedingAdaptation =
getTaskAdaptable(provider, key);
- return Tasks.<Object>builder()
+ return Tasks.builder()
.displayName("resolving external configuration: '" + key + "'
from provider '" + provider + "'")
.dynamic(false)
- .body(new Callable<Object>() {
- @Override
- public Object call() throws Exception {
- Iterator<TaskAdaptable<Object>> ai =
argsNeedingAdaptation.iterator();
- return
((ManagementContextInternal)mgmt).getExternalConfigProviderRegistry().getConfig(
- resolveArgument(provider, ai),
- resolveArgument(key, ai));
- }
+ .body(() -> {
+ Iterator<TaskAdaptable<Object>> ai =
argsNeedingAdaptation.iterator();
+ return
((ManagementContextInternal)mgmt).getExternalConfigProviderRegistry().getConfig(
+ resolveArgument(provider, ai),
+ resolveArgument(key, ai));
})
.build();
}
@@ -761,7 +758,7 @@ public class DependentConfiguration {
if (resolvedReplacement.isAbsent()) return
Absent.castAbsent(resolvedReplacement);
String resolvedReplacementStr =
String.valueOf(resolvedReplacement.get());
- String result = new StringFunctions.RegexReplacer(resolvedPatternStr,
resolvedReplacementStr).apply(resolvedSourceStr);
+ String result = new RegexReplacer(resolvedPatternStr,
resolvedReplacementStr).apply(resolvedSourceStr);
return Maybe.of(result);
}
@@ -784,8 +781,8 @@ public class DependentConfiguration {
if (resolvedReplacement.isAbsent()) return
Absent.castAbsent(resolvedReplacement);
String resolvedReplacementStr =
String.valueOf(resolvedReplacement.get());
- RegexReplacer result = new
StringFunctions.RegexReplacer(resolvedPatternStr, resolvedReplacementStr);
- return Maybe.<Function<String, String>>of(result);
+ RegexReplacer result = new RegexReplacer(resolvedPatternStr,
resolvedReplacementStr);
+ return Maybe.of(result);
}
public static Task<Function<String, String>> regexReplacement(Object
pattern, Object replacement) {
@@ -804,7 +801,7 @@ public class DependentConfiguration {
Integer resolvedMaxThreadsInt =
TypeCoercions.coerce(resolvedMaxThreads, Integer.class);
ReleaseableLatch result =
ReleaseableLatch.Factory.newMaxConcurrencyLatch(resolvedMaxThreadsInt);
- return Maybe.<ReleaseableLatch>of(result);
+ return Maybe.of(result);
}
public static Task<ReleaseableLatch> maxConcurrency(Object maxThreads) {
@@ -845,11 +842,12 @@ public class DependentConfiguration {
@Nullable
@Override
public String apply(@Nullable List<Object> input) {
+ if (input==null) return null;
Iterator<?> taskArgsIterator = input.iterator();
String resolvedSource = resolveArgument(source, taskArgsIterator);
String resolvedPattern = resolveArgument(pattern,
taskArgsIterator);
String resolvedReplacement = resolveArgument(replacement,
taskArgsIterator);
- return new StringFunctions.RegexReplacer(resolvedPattern,
resolvedReplacement).apply(resolvedSource);
+ return new RegexReplacer(resolvedPattern,
resolvedReplacement).apply(resolvedSource);
}
}
@@ -866,8 +864,9 @@ public class DependentConfiguration {
@Override
public Function<String, String> apply(List<Object> input) {
+ if (input==null) return null;
Iterator<?> taskArgsIterator = input.iterator();
- return new StringFunctions.RegexReplacer(resolveArgument(pattern,
taskArgsIterator), resolveArgument(replacement, taskArgsIterator));
+ return new RegexReplacer(resolveArgument(pattern,
taskArgsIterator), resolveArgument(replacement, taskArgsIterator));
}
}
@@ -881,6 +880,7 @@ public class DependentConfiguration {
@Override
public ReleaseableLatch apply(List<Object> input) {
+ if (input==null) return null;
Iterator<?> taskArgsIterator = input.iterator();
Integer maxThreadsNum = resolveArgument(maxThreads,
taskArgsIterator, Integer.class);
return
ReleaseableLatch.Factory.newMaxConcurrencyLatch(maxThreadsNum);
@@ -897,7 +897,7 @@ public class DependentConfiguration {
/**
* Resolves the argument as follows:
- *
+ * <p>
* If the argument is a DeferredSupplier, we will block and wait for it to
resolve. If the argument is TaskAdaptable or TaskFactory,
* we will assume that the resolved task has been queued on the {@code
taskArgsIterator}, otherwise the argument has already been resolved.
*
@@ -927,7 +927,7 @@ public class DependentConfiguration {
*/
@Deprecated
public static <T> Task<List<T>> listAttributesWhenReady(AttributeSensor<T>
sensor, Iterable<Entity> entities, Closure<Boolean> readiness) {
- Predicate<Object> readinessPredicate = (readiness != null) ?
GroovyJavaMethods.<Object>predicateFromClosure(readiness) :
JavaGroovyEquivalents.groovyTruthPredicate();
+ Predicate<Object> readinessPredicate = (readiness != null) ?
GroovyJavaMethods.predicateFromClosure(readiness) :
JavaGroovyEquivalents.groovyTruthPredicate();
return listAttributesWhenReady(sensor, entities, readinessPredicate);
}
@@ -951,7 +951,7 @@ public class DependentConfiguration {
try {
return (T) Tasks.resolveValue(t, Object.class,
((EntityInternal)context).getExecutionContext(), contextMessage);
} catch (ExecutionException e) {
- throw Throwables.propagate(e);
+ throw Exceptions.propagate(e);
}
}
@@ -972,6 +972,20 @@ public class DependentConfiguration {
}
}
+ public static class AttributeAndSensorConditionWithTimeouts<T> extends
AttributeAndSensorCondition<T> {
+ /** timeout used once subscription is established, subject to any
timeoutInitial; if unset the condition is only checked at start */
+ protected final Duration timeout;
+ /** timeout used if condition is true prior to subscription being
established, and also used as a minimum period from subscription start
+ * to which any {@link #timeout} is extended; eg if this is 5m but
timeout is 1m, a failure at the 3m mark will wait 2m;
+ * after 4m all failures will wait 1m. */
+ protected final Duration timeoutInitial;
+ public AttributeAndSensorConditionWithTimeouts(Entity source,
AttributeSensor<T> sensor, Predicate<? super T> predicate, Duration timeout,
Duration timeoutInitial) {
+ super(source, sensor, predicate);
+ this.timeout = timeout;
+ this.timeoutInitial = timeoutInitial;
+ }
+ }
+
public static ProtoBuilder builder() {
return new ProtoBuilder();
}
@@ -1004,14 +1018,14 @@ public class DependentConfiguration {
* Will wait for the attribute on the given entity, not aborting when
it goes {@link Lifecycle#ON_FIRE}, no timeout.
*/
public <T2> Builder<T2,T2> attributeWhenReadyNoOptions(Entity source,
AttributeSensor<T2> sensor) {
- return new Builder<T2,T2>(source, sensor);
+ return new Builder<>(source, sensor);
}
/**
* Alias for {@link #attributeWhenReadyNoOptions(Entity,
AttributeSensor)}
*/
public <T2> Builder<T2,T2> attributeWhenReadyAllowingOnFire(Entity
source, AttributeSensor<T2> sensor) {
- return new Builder<T2,T2>(source, sensor);
+ return new Builder<>(source, sensor);
}
/** Constructs a builder for task for parallel execution returning a
list of values of the given sensor list on the given entity,
@@ -1023,7 +1037,7 @@ public class DependentConfiguration {
/** As {@link #attributeWhenReadyFromMultiple(Iterable,
AttributeSensor)} with an explicit readiness test. */
@Beta
public <T> MultiBuilder<T, T, List<T>>
attributeWhenReadyFromMultiple(Iterable<? extends Entity> sources,
AttributeSensor<T> sensor, Predicate<? super T> readiness) {
- return new MultiBuilder<T, T, List<T>>(sources, sensor, readiness);
+ return new MultiBuilder<>(sources, sensor, readiness);
}
}
@@ -1035,8 +1049,7 @@ public class DependentConfiguration {
protected AttributeSensor<T> sensor;
protected Predicate<? super T> readiness;
protected Function<? super T, ? extends V> postProcess;
- protected List<AttributeAndSensorCondition<?>> abortSensorConditions =
Lists.newArrayList();
- protected List<Pair<AttributeAndSensorCondition<Object>,Duration>>
timeoutIfTimeoutSensorConditions = null;
+ protected List<AttributeAndSensorConditionWithTimeouts<Object>>
timeoutIfTimeoutSensorConditions = null;
protected String blockingDetails;
protected Duration timeout;
protected Maybe<V> onTimeout = Maybe.absent();
@@ -1078,8 +1091,7 @@ public class DependentConfiguration {
return abortIf(source, sensor,
JavaGroovyEquivalents.groovyTruthPredicate());
}
public <T2> Builder<T,V> abortIf(Entity source, AttributeSensor<T2>
sensor, Predicate<? super T2> predicate) {
- abortSensorConditions.add(new
AttributeAndSensorCondition<T2>(source, sensor, predicate));
- return this;
+ return timeoutIf(source, sensor, predicate, Duration.ZERO,
Duration.ZERO);
}
/** Causes the depender to abort immediately if {@link
Attributes#SERVICE_STATE_ACTUAL}
* is {@link Lifecycle#ON_FIRE}. */
@@ -1087,10 +1099,19 @@ public class DependentConfiguration {
abortIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.equalTo(Lifecycle.ON_FIRE));
return this;
}
+ public Builder<T,V> timeoutIfOnFire(Duration time, Duration
timeInitial) {
+ if (time==null && timeInitial==null) return this;
+ timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.equalTo(Lifecycle.ON_FIRE), time, timeInitial);
+ return this;
+ }
/** Causes the depender to timeout after the given time if {@link
Attributes#SERVICE_STATE_ACTUAL}
* is not starting or running */
public Builder<T,V> timeoutIfDown(Duration time) {
- timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.in(MutableList.of(Lifecycle.STOPPING, Lifecycle.STOPPED,
Lifecycle.DESTROYED, Lifecycle.ON_FIRE, Lifecycle.CREATED, null)), time);
+ return timeoutIfDown(time, null);
+ }
+ public Builder<T,V> timeoutIfDown(Duration time, Duration timeInitial)
{
+ if (time==null && timeInitial==null) return this;
+ timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.in(MutableList.of(Lifecycle.STOPPING, Lifecycle.STOPPED,
Lifecycle.DESTROYED, Lifecycle.ON_FIRE, Lifecycle.CREATED, null)), time,
timeInitial);
return this;
}
@@ -1101,8 +1122,17 @@ public class DependentConfiguration {
public Builder<T,V> options(AttributeWhenReadyOptions options) {
if (options!=null) {
if (options.timeout!=null) timeout(options.timeout);
- if (options.timeout_if_down != null)
timeoutIfDown(options.timeout_if_down);
- if (Boolean.TRUE.equals(options.abort_if_on_fire))
abortIfOnFire();
+ timeoutIfDown(options.timeout_if_down,
options.timeout_if_down_initial);
+
+ if (options.timeout_if_on_fire==null &&
options.timeout_if_on_fire_initial==null) {
+ if (Boolean.TRUE.equals(options.abort_if_on_fire)) {
+ AttributeWhenReadyOptions defaultOptions =
AttributeWhenReadyOptions.defaultOptions();
+ timeoutIfOnFire(defaultOptions.timeout_if_on_fire,
defaultOptions.timeout_if_on_fire_initial);
+ }
+ // otherwise nothing
+ } else {
+ timeoutIfOnFire(options.timeout_if_on_fire,
options.timeout_if_on_fire_initial);
+ }
if (!options.wait_for_truthy) {
readiness = Predicates.alwaysTrue();
@@ -1121,9 +1151,12 @@ public class DependentConfiguration {
return this;
}
/** specifies the supplied timeout if the condition is met */
- public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2>
sensor, Predicate<? super T2> predicate, Duration val) {
+ public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2>
sensor, Predicate<? super T2> predicate, Duration timeout) {
+ return timeoutIf(source, sensor, predicate, timeout, timeout);
+ }
+ public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2>
sensor, Predicate<? super T2> predicate, Duration timeout, Duration
timeoutInitial) {
if (timeoutIfTimeoutSensorConditions==null)
timeoutIfTimeoutSensorConditions = MutableList.of();
- timeoutIfTimeoutSensorConditions.add(Pair.of(new
AttributeAndSensorCondition(source, sensor, predicate), val));
+ timeoutIfTimeoutSensorConditions.add(new
AttributeAndSensorConditionWithTimeouts(source, sensor, predicate, timeout,
timeoutInitial));
return this;
}
public Builder<T,V> onTimeoutReturn(V val) {
@@ -1131,7 +1164,7 @@ public class DependentConfiguration {
return this;
}
public Builder<T,V> onTimeoutThrow() {
- onTimeout = Maybe.<V>absent();
+ onTimeout = Maybe.absent();
return this;
}
public Builder<T,V> onUnmanagedReturn(V val) {
@@ -1139,7 +1172,7 @@ public class DependentConfiguration {
return this;
}
public Builder<T,V> onUnmanagedThrow() {
- onUnmanaged = Maybe.<V>absent();
+ onUnmanaged = Maybe.absent();
return this;
}
/** @since 0.7.0 included in case old behaviour of not checking
whether the entity is managed is required
@@ -1169,13 +1202,13 @@ public class DependentConfiguration {
.description("Waiting on sensor "+sensor.getName()+" from
"+source)
.tag("attributeWhenReady")
.tag(BrooklynTaskTags.TRANSIENT_TASK_TAG)
- .body(new WaitInTaskForAttributeReady<T,V>(this))
+ .body(new WaitInTaskForAttributeReady<>(this))
.build();
}
public V runNow() {
validate();
- return new WaitInTaskForAttributeReady<T,V>(this).call();
+ return new WaitInTaskForAttributeReady<>(this).call();
}
@SuppressWarnings({ "unchecked", "rawtypes" })
private void validate() {
@@ -1208,11 +1241,11 @@ public class DependentConfiguration {
}
@Beta
protected MultiBuilder(Iterable<? extends Entity> sources,
AttributeSensor<T> sensor, Predicate<? super T> readiness) {
- builder = new Builder<T,V>(null, sensor);
+ builder = new Builder<>(null, sensor);
builder.readiness(readiness);
for (Entity s : checkNotNull(sources, "sources")) {
- multiSource.add(new AttributeAndSensorCondition<T>(s, sensor,
readiness));
+ multiSource.add(new AttributeAndSensorCondition<>(s, sensor,
readiness));
}
this.name = "waiting on "+sensor.getName();
this.descriptionBase = "waiting on "+sensor.getName()+" "+readiness
@@ -1300,14 +1333,12 @@ public class DependentConfiguration {
} else {
return
Tasks.<V2>builder().displayName(name).description(descriptionBase)
.tag("attributeWhenReady")
- .body(new Callable<V2>() {
- @Override public V2 call() throws Exception {
- List<V> prePostProgress =
DynamicTasks.queue(parallelTask).get();
- return DynamicTasks.queue(
-
Tasks.<V2>builder().displayName("post-processing").description("Applying
"+postProcessFromMultiple)
-
.body(Functionals.callable(postProcessFromMultiple, prePostProgress))
- .build()).get();
- }
+ .body(() -> {
+ List<V> prePostProgress =
DynamicTasks.queue(parallelTask).get();
+ return DynamicTasks.queue(
+
Tasks.<V2>builder().displayName("post-processing").description("Applying
"+postProcessFromMultiple)
+
.body(Functionals.callable(postProcessFromMultiple, prePostProgress))
+ .build()).get();
})
.build();
}
diff --git
a/core/src/test/java/org/apache/brooklyn/core/entity/DependentConfigurationTest.java
b/core/src/test/java/org/apache/brooklyn/core/entity/DependentConfigurationTest.java
index ce61252c9d..5903976eaf 100644
---
a/core/src/test/java/org/apache/brooklyn/core/entity/DependentConfigurationTest.java
+++
b/core/src/test/java/org/apache/brooklyn/core/entity/DependentConfigurationTest.java
@@ -29,6 +29,7 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicReference;
+import com.google.common.base.Stopwatch;
import org.apache.brooklyn.api.entity.EntitySpec;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.sensor.AttributeSensor;
@@ -67,10 +68,10 @@ import com.google.common.util.concurrent.Callables;
public class DependentConfigurationTest extends BrooklynAppUnitTestSupport {
private static final Logger log =
LoggerFactory.getLogger(DependentConfigurationTest.class);
-
+
public static final int SHORT_WAIT_MS = 100;
public static final int TIMEOUT_MS = 30*1000;
-
+
private TestEntity entity;
private TestEntity entity2;
@@ -81,11 +82,11 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
entity = app.createAndManageChild(EntitySpec.create(TestEntity.class));
entity2 =
app.createAndManageChild(EntitySpec.create(TestEntity.class));
}
-
+
@Test
public void testTransform() throws Exception {
Task<Integer> t = DependentConfiguration.transform(
- new BasicTask<Integer>(Callables.returning(2)),
+ new BasicTask<Integer>(Callables.returning(2)),
incrementerFunction());
submit(t);
assertEquals(t.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
Integer.valueOf(3));
@@ -97,12 +98,12 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
return val + 1;
}};
}
-
+
@Test
public void testFormatString() throws Exception {
Task<String> t = DependentConfiguration.formatString("%s://%s:%d/",
"http",
- new BasicTask<String>(Callables.returning("localhost")),
+ new BasicTask<String>(Callables.returning("localhost")),
DependentConfiguration.transform(new
BasicTask<Integer>(Callables.returning(8080)), incrementerFunction()));
submit(t);
Assert.assertEquals(t.get(TIMEOUT_MS, TimeUnit.MILLISECONDS),
"http://localhost:8081/");
@@ -157,7 +158,7 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
public void testAttributeWhenReady() throws Exception {
final Task<String> t =
submit(DependentConfiguration.attributeWhenReady(entity, TestEntity.NAME));
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertEquals(assertDoneEventually(t), "myval");
}
@@ -165,10 +166,10 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
@Test
public void testAttributeWhenReadyWithPredicate() throws Exception {
final Task<String> t =
submit(DependentConfiguration.attributeWhenReady(entity, TestEntity.NAME,
Predicates.equalTo("myval2")));
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval2");
assertEquals(assertDoneEventually(t), "myval2");
}
@@ -177,7 +178,7 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
public void testAttributeWhenReadyWithPostProcessing() throws Exception {
final Task<String> t =
submit(DependentConfiguration.valueWhenAttributeReady(entity,
TestEntity.SEQUENCE, Functions.toStringFunction()));
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.SEQUENCE, 1);
assertEquals(assertDoneEventually(t), "1");
}
@@ -190,7 +191,7 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
.build());
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.SEQUENCE, 1);
assertEquals(assertDoneEventually(t), "1");
}
@@ -207,7 +208,7 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
}});
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.SEQUENCE, 1);
assertEquals(assertDoneEventually(t), "1");
}
@@ -219,7 +220,7 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
.abortIf(entity2, TestEntity.SEQUENCE, Predicates.equalTo(1))
.build());
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertEquals(assertDoneEventually(t), "myval");
}
@@ -320,41 +321,41 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
}
@Test
- public void testAttributeWhenReadyAbortsWhenOnFireByDefault() {
+ public void testAttributeWhenReadyAbortsWhenOnFireImmediately() {
log.info("starting test "+JavaClassNames.niceClassAndMethod());
final Task<String> t = submit(DependentConfiguration.builder()
- .attributeWhenReady(entity, TestEntity.NAME)
+ .attributeWhenReady(entity, TestEntity.NAME).abortIfOnFire()
.build());
ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
EntityAsserts.assertAttributeEqualsEventually(entity,
Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
-
+
try {
assertDoneEventually(t);
fail("Should have failed already!");
} catch (Throwable e) {
- if (e.toString().contains("Aborted waiting for ready"))
- return;
-
+ if (e.toString().contains("Aborted waiting for ready")) return;
+// if (e.toString().contains("Unsatisfied after")) return;
+
log.warn("Did not abort as expected: "+e, e);
Dumper.dumpInfo(entity);
-
+
throw Exceptions.propagate(e);
}
}
@Test(invocationCount=100, groups = "Integration")
public void testAttributeWhenReadyAbortsWhenOnfireByDefaultManyTimes() {
- testAttributeWhenReadyAbortsWhenOnFireByDefault();
+ testAttributeWhenReadyAbortsWhenOnFireImmediately();
}
-
+
@Test
- public void testAttributeWhenReadyAbortsWhenAlreadyOnFireByDefault()
throws Exception {
+ public void testAttributeWhenReadyAbortsWhenAlreadyOnFireImmediately()
throws Exception {
ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
EntityAsserts.assertAttributeEqualsEventually(entity,
Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
-
+
final Task<String> t = submit(DependentConfiguration.builder()
- .attributeWhenReady(entity, TestEntity.NAME)
+ .attributeWhenReady(entity, TestEntity.NAME).abortIfOnFire()
.build());
try {
@@ -365,16 +366,36 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
}
}
+ @Test
+ public void testAttributeWhenReadyAbortsWhenAlreadyOnFireAfterMillis()
throws Exception {
+ ServiceStateLogic.setExpectedState(entity, Lifecycle.ON_FIRE);
+ EntityAsserts.assertAttributeEqualsEventually(entity,
Attributes.SERVICE_STATE_ACTUAL, Lifecycle.ON_FIRE);
+
+ Stopwatch timer = Stopwatch.createStarted();
+ final Task<String> t = submit(DependentConfiguration.builder()
+ .attributeWhenReady(entity,
TestEntity.NAME).timeoutIfOnFire(Duration.ZERO, Duration.millis(100))
+ .build());
+
+ try {
+ assertDoneEventually(t);
+ fail();
+ } catch (Exception e) {
+ if (e.toString().contains("Aborted waiting for ready")) return;
+// if (e.toString().contains("Unsatisfied after")) return;
+ Asserts.assertThat(timer, tt ->
Duration.of(tt).isLongerThan(Duration.millis(99)));
+ }
+ }
+
@Test
public void testListAttributeWhenReadyFromMultipleEntities() throws
Exception {
final Task<List<String>> t = submit(DependentConfiguration.builder()
.attributeWhenReadyFromMultiple(ImmutableList.of(entity,
entity2), TestEntity.NAME)
.build());
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertNotDoneContinually(t);
-
+
entity2.sensors().set(TestEntity.NAME, "myval2");
assertEquals(ImmutableSet.copyOf(assertDoneEventually(t)),
ImmutableSet.of("myval", "myval2"));
}
@@ -384,11 +405,11 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
final Task<List<String>> t = submit(DependentConfiguration.builder()
.attributeWhenReadyFromMultiple(ImmutableList.of(entity,
entity2), TestEntity.NAME, StringPredicates.startsWith("myval"))
.build());
-
+
entity.sensors().set(TestEntity.NAME, "wrongval");
entity2.sensors().set(TestEntity.NAME, "wrongval2");
assertNotDoneContinually(t);
-
+
entity.sensors().set(TestEntity.NAME, "myval");
assertNotDoneContinually(t);
entity2.sensors().set(TestEntity.NAME, "myval2");
@@ -410,7 +431,7 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
}
}})
.build());
-
+
entity.sensors().set(TestEntity.SEQUENCE, 1);
entity2.sensors().set(TestEntity.SEQUENCE, 2);
assertEquals(assertDoneEventually(t), "1,2");
@@ -426,7 +447,7 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
}
});
}
-
+
private <T> T assertDoneEventually(final Task<T> t) throws Exception {
final AtomicReference<ExecutionException> exception = new
AtomicReference<ExecutionException>();
T result = Asserts.succeedsEventually(MutableMap.of("timeout",
Duration.FIVE_SECONDS), new Callable<T>() {
@@ -449,11 +470,11 @@ public class DependentConfigurationTest extends
BrooklynAppUnitTestSupport {
return result;
}
-
+
private <T> Task<T> submit(Task<T> task) {
return app.getExecutionContext().submit(task);
}
-
+
private <T> Task<T> submit(Callable<T> job) {
return app.getExecutionContext().submit(new BasicTask<T>(job));
}