This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push:
new 2a56281088 tidy-up of attributeWhenReady options, and tests
2a56281088 is described below
commit 2a56281088ef0a0035196523e33bf4f8d57161a1
Author: Alex Heneveld <[email protected]>
AuthorDate: Fri Jul 19 11:39:55 2024 +0100
tidy-up of attributeWhenReady options, and tests
---
.../brooklyn/camp/brooklyn/ConfigYamlTest.java | 240 ++++++++++++++++++---
.../core/sensor/DependentConfiguration.java | 160 ++++++++------
2 files changed, 312 insertions(+), 88 deletions(-)
diff --git
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
index 8b819ba880..777310d6d1 100644
---
a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
+++
b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ConfigYamlTest.java
@@ -19,47 +19,42 @@
package org.apache.brooklyn.camp.brooklyn;
import com.google.common.annotations.Beta;
-import java.util.Map;
-
+import com.google.common.base.Joiner;
import com.google.common.base.Stopwatch;
-import org.apache.brooklyn.core.config.Sanitizer;
-import org.apache.brooklyn.core.entity.EntityAsserts;
-import org.apache.brooklyn.core.internal.BrooklynProperties;
-import org.apache.brooklyn.core.server.BrooklynServerConfig;
-import org.apache.brooklyn.util.guava.Maybe;
-import org.apache.brooklyn.util.internal.BrooklynSystemProperties;
-import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
-import org.apache.brooklyn.util.yaml.Yamls;
-import org.testng.Assert;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertNull;
-import static org.testng.Assert.assertTrue;
-
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Iterables;
import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.core.config.ConfigKeys;
+import org.apache.brooklyn.core.config.Sanitizer;
+import org.apache.brooklyn.core.entity.Attributes;
import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
import org.apache.brooklyn.core.mgmt.internal.LocalManagementContext;
import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.server.BrooklynServerConfig;
import org.apache.brooklyn.core.test.entity.TestEntity;
import org.apache.brooklyn.test.Asserts;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
+import org.apache.brooklyn.util.guava.Maybe;
+import org.apache.brooklyn.util.text.StringEscapes.JavaStringEscapes;
import org.apache.brooklyn.util.time.Duration;
import org.apache.brooklyn.util.time.Time;
+import org.apache.brooklyn.util.yaml.Yamls;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import com.google.common.base.Joiner;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Iterables;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.testng.Assert.*;
public class ConfigYamlTest extends AbstractYamlTest {
@@ -436,7 +431,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
}
@Test
- public void testAttributeWhenReadyOptions() throws Exception {
+ public void testAttributeWhenReadyOptionsBasic() throws Exception {
String yaml = Joiner.on("\n").join(
"services:",
"- type: org.apache.brooklyn.core.test.entity.TestEntity",
@@ -457,7 +452,7 @@ public class ConfigYamlTest extends AbstractYamlTest {
}
@Test
- public void testOtherEntityAttributeWhenReadyOptions() throws Exception {
+ public void testAttributeWhenReadyOptionsBasicOnOtherEntity() throws
Exception {
String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: 10ms } ] } ] }";
String v1 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { \"timeout\": \"10ms\" } ] } ] }";
@@ -482,10 +477,203 @@ public class ConfigYamlTest extends AbstractYamlTest {
entity2.sensors().set(TestEntity.NAME, "x");
EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, "x");
+ // and on fire
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.ON_FIRE);
+ EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, "x");
+ entity2.sensors().remove(TestEntity.NAME);
+ sw = Stopwatch.createStarted();
+ Asserts.assertFailsWith(() ->
entity1.config().get(TestEntity.CONF_NAME),
+ Asserts.expectedFailureContainsIgnoreCase("Cannot resolve",
"$brooklyn:chain", " attributeWhenReady", "test.name", "0", "Resolving config
test.confName",
+// "Unsatisfied after ",
+ "Abort due to", "on-fire"));
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
+
+ // and source code
Maybe<Object> rawV = entity1.config().getRaw(TestEntity.CONF_NAME);
Asserts.assertEquals(rawV.get().toString(), v1);
}
+ @Test
+ public void testAttributeWhenReadyOptionsTimeoutZero() throws Exception {
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: 0 } ] } ] }";
+
+ String yaml = Joiner.on("\n").join(
+ "services:",
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " brooklyn.config:",
+ " test.confName: "+v0,
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " id: entity2");
+
+ final Entity app = createStartWaitAndLogApplication(yaml);
+ final TestEntity entity1 = (TestEntity)
Iterables.get(app.getChildren(), 0);
+ final TestEntity entity2 = (TestEntity)
Iterables.get(app.getChildren(), 1);
+
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.RUNNING);
+ Stopwatch sw = Stopwatch.createStarted();
+ Asserts.assertFailsWith(() ->
entity1.config().get(TestEntity.CONF_NAME),
+ Asserts.expectedFailureContainsIgnoreCase("Cannot resolve",
"$brooklyn:chain", " attributeWhenReady", "test.name", "0", "Resolving config
test.confName",
+ "Waiting not permitted"));
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
+
+ entity2.sensors().set(TestEntity.NAME, "");
+ sw = Stopwatch.createStarted();
+ Asserts.assertFailsWith(() ->
entity1.config().get(TestEntity.CONF_NAME),
+ Asserts.expectedFailureContainsIgnoreCase("Cannot resolve",
"$brooklyn:chain", " attributeWhenReady", "test.name", "0", "Resolving config
test.confName",
+ "Waiting not permitted"));
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
+
+ entity2.sensors().set(TestEntity.NAME, "x");
+ EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, "x");
+
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.ON_FIRE);
+ EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, "x");
+ entity2.sensors().remove(TestEntity.NAME);
+ // not aborted, just no timeout here
+ Asserts.assertFailsWith(() ->
entity1.config().get(TestEntity.CONF_NAME),
+ e -> {
+ Asserts.expectedFailureContainsIgnoreCase(e, "Cannot
resolve", "$brooklyn:chain", " attributeWhenReady", "test.name", "0",
"Resolving config test.confName",
+ "Waiting not permitted");
+ Asserts.expectedFailureDoesNotContainIgnoreCase(e,
+ "Abort due to", "on-fire",
+ "Unsatisfied after ");
+ return true;
+ });
+ }
+
+ @Test
+ public void testAttributeWhenReadyOptionsTimeoutIfDownTimesOut() throws
Exception {
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: forever, timeout_if_down: 10ms,
abort_if_on_fire: false } ] } ] }";
+
+ String yaml = Joiner.on("\n").join(
+ "services:",
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " brooklyn.config:",
+ " test.confName: "+v0,
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " id: entity2");
+
+ final Entity app = createStartWaitAndLogApplication(yaml);
+ final TestEntity entity1 = (TestEntity)
Iterables.get(app.getChildren(), 0);
+ final TestEntity entity2 = (TestEntity)
Iterables.get(app.getChildren(), 1);
+
+ new Thread(()->{
+ Time.sleep(10);
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.STOPPING);
+ }).start();
+
+ Stopwatch sw = Stopwatch.createStarted();
+ Asserts.assertFailsWith(() ->
entity1.config().get(TestEntity.CONF_NAME),
+ Asserts.expectedFailureContainsIgnoreCase("Cannot resolve",
"$brooklyn:chain", " attributeWhenReady", "test.name", "0", "Resolving config
test.confName",
+ "tighter timeout due to", "stopping"));
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
+ }
+
+ @Test
+ public void
testAttributeWhenReadyOptionsTimeoutIfDownResetsAndAbortsIfOnFire() throws
Exception {
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: forever, timeout_if_down: 10ms
} ] } ] }";
+
+ String yaml = Joiner.on("\n").join(
+ "services:",
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " brooklyn.config:",
+ " test.confName: "+v0,
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " id: entity2");
+
+ final Entity app = createStartWaitAndLogApplication(yaml);
+ final TestEntity entity1 = (TestEntity)
Iterables.get(app.getChildren(), 0);
+ final TestEntity entity2 = (TestEntity)
Iterables.get(app.getChildren(), 1);
+
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.STOPPING);
+ Stopwatch sw = Stopwatch.createStarted();
+ new Thread(()->{
+ Time.sleep(Duration.millis(10));
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.STOPPING);
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.RUNNING); // will clear the timeout
+
+ Time.sleep(Duration.millis(10));
+ entity2.sensors().set(TestEntity.NAME, "x");
+ }).start();
+ EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, "x");
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isLongerThan(Duration.millis(19)));
+
+ entity2.sensors().remove(TestEntity.NAME);
+ sw = Stopwatch.createStarted();
+ new Thread(()->{
+ Time.sleep(Duration.millis(10));
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.ON_FIRE);
+ }).start();
+ Asserts.assertFailsWith(() ->
entity1.config().get(TestEntity.CONF_NAME),
+ Asserts.expectedFailureContainsIgnoreCase("Cannot resolve",
"$brooklyn:chain", " attributeWhenReady", "test.name", "0", "Resolving config
test.confName",
+// "Unsatisfied after ",
+ "Abort due to", "on-fire"));
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
+ }
+
+ @Test(groups="Integration") // because slow
+ public void testAttributeWhenReadyOptionsTimeoutIfDownResetsBetter()
throws Exception {
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { timeout: forever, timeout_if_down: 1s,
abort_if_on_fire: false } ] } ] }";
+
+ String yaml = Joiner.on("\n").join(
+ "services:",
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " brooklyn.config:",
+ " test.confName: "+v0,
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " id: entity2");
+
+ final Entity app = createStartWaitAndLogApplication(yaml);
+ final TestEntity entity1 = (TestEntity)
Iterables.get(app.getChildren(), 0);
+ final TestEntity entity2 = (TestEntity)
Iterables.get(app.getChildren(), 1);
+
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.STOPPING);
+ Stopwatch sw = Stopwatch.createStarted();
+ new Thread(()->{
+ Time.sleep(Duration.millis(100));
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.STOPPING);
+
+ // comment these two lines out and it shoud fail
+ Time.sleep(Duration.millis(50));
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.RUNNING); // will clear the timeout
+
+ Time.sleep(Duration.seconds(2));
+ entity2.sensors().set(TestEntity.NAME, "x");
+ }).start();
+ EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, "x");
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isLongerThan(Duration.millis(19)));
+ }
+
+ @Test
+ public void testAttributeWhenReadyOptionsAbortIfOnFireAndNoWait() throws
Exception {
+ String v0 = "{ $brooklyn:chain: [ $brooklyn:entity(\"entity2\"), {
attributeWhenReady: [ \"test.name\", { wait_for_truthy: false, " +
+ // these have no effect with wait_for_truthy: false
+ "timeout: 1s, abort_if_on_fire: true } ] } ] }";
+
+ String yaml = Joiner.on("\n").join(
+ "services:",
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " brooklyn.config:",
+ " test.confName: "+v0,
+ "- type: org.apache.brooklyn.core.test.entity.TestEntity",
+ " id: entity2");
+
+ final Entity app = createStartWaitAndLogApplication(yaml);
+ final TestEntity entity1 = (TestEntity)
Iterables.get(app.getChildren(), 0);
+ final TestEntity entity2 = (TestEntity)
Iterables.get(app.getChildren(), 1);
+
+ Stopwatch sw = Stopwatch.createStarted();
+ EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, null);
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
+
+ entity2.sensors().set(Attributes.SERVICE_STATE_ACTUAL,
Lifecycle.ON_FIRE);
+ EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, null);
+
+ entity2.sensors().set(TestEntity.NAME, "x");
+ EntityAsserts.assertConfigEquals(entity1, TestEntity.CONF_NAME, "x");
+ Asserts.assertThat(Duration.of(sw.elapsed()), d ->
d.isShorterThan(Duration.millis(999)));
+ }
+
@Test
public void testConfigGoodNumericCoercions() throws Exception {
String yaml = Joiner.on("\n").join(
diff --git
a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
index 39567f8116..e29cc3ee10 100644
---
a/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
+++
b/core/src/main/java/org/apache/brooklyn/core/sensor/DependentConfiguration.java
@@ -18,27 +18,15 @@
*/
package org.apache.brooklyn.core.sensor;
-import static com.google.common.base.Preconditions.checkNotNull;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Semaphore;
-import java.util.concurrent.TimeUnit;
-
-import javax.annotation.Nullable;
-
+import com.fasterxml.jackson.annotation.JsonAlias;
+import com.google.common.annotations.Beta;
+import com.google.common.base.*;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Iterables;
+import com.google.common.collect.Lists;
+import groovy.lang.Closure;
import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.mgmt.ExecutionContext;
-import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
-import org.apache.brooklyn.api.mgmt.Task;
-import org.apache.brooklyn.api.mgmt.TaskAdaptable;
-import org.apache.brooklyn.api.mgmt.TaskFactory;
+import org.apache.brooklyn.api.mgmt.*;
import org.apache.brooklyn.api.sensor.AttributeSensor;
import org.apache.brooklyn.api.sensor.SensorEvent;
import org.apache.brooklyn.api.sensor.SensorEventListener;
@@ -53,15 +41,7 @@ import
org.apache.brooklyn.util.collections.CollectionFunctionals;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
-import org.apache.brooklyn.util.core.task.BasicExecutionContext;
-import org.apache.brooklyn.util.core.task.BasicTask;
-import org.apache.brooklyn.util.core.task.DeferredSupplier;
-import org.apache.brooklyn.util.core.task.DynamicTasks;
-import org.apache.brooklyn.util.core.task.ImmediateSupplier;
-import org.apache.brooklyn.util.core.task.ParallelTask;
-import org.apache.brooklyn.util.core.task.TaskInternal;
-import org.apache.brooklyn.util.core.task.Tasks;
-import org.apache.brooklyn.util.core.task.ValueResolver;
+import org.apache.brooklyn.util.core.task.*;
import org.apache.brooklyn.util.exceptions.CompoundRuntimeException;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.NotManagedException;
@@ -77,21 +57,19 @@ import
org.apache.brooklyn.util.text.StringFunctions.RegexReplacer;
import org.apache.brooklyn.util.text.Strings;
import org.apache.brooklyn.util.time.CountdownTimer;
import org.apache.brooklyn.util.time.Duration;
+import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import com.google.common.annotations.Beta;
-import com.google.common.base.Function;
-import com.google.common.base.Functions;
-import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.base.Predicates;
-import com.google.common.base.Throwables;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.Iterables;
-import com.google.common.collect.Lists;
+import javax.annotation.Nullable;
+import java.util.*;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Semaphore;
+import java.util.concurrent.TimeUnit;
+import java.util.function.BiConsumer;
-import groovy.lang.Closure;
+import static com.google.common.base.Preconditions.checkNotNull;
/** Conveniences for making tasks which run in entity {@link
ExecutionContext}s, blocking on attributes from other entities, possibly
transforming those;
* these {@link Task} instances are typically passed in {@link
Entity#setConfig(ConfigKey, Object)}.
@@ -159,14 +137,19 @@ public class DependentConfiguration {
}
public static class AttributeWhenReadyOptions {
- Duration timeoutIfOnFire;
- Duration timeoutIfDown;
+ @JsonAlias("timeoutIfDown")
+ Duration timeout_if_down;
Duration timeout;
+ @JsonAlias("abortIfOnFire")
+ boolean abort_if_on_fire = true;
+ @JsonAlias("waitForTruthy")
+ boolean wait_for_truthy = true;
+
public static AttributeWhenReadyOptions defaultOptions() {
AttributeWhenReadyOptions result = new AttributeWhenReadyOptions();
- result.timeoutIfOnFire = Duration.ZERO;
- result.timeoutIfDown = Duration.ONE_MINUTE;
+ result.abort_if_on_fire = true;
+ result.timeout_if_down = Duration.ONE_MINUTE;
return result;
}
@@ -263,6 +246,7 @@ public class DependentConfiguration {
protected final AttributeSensor<T> sensor;
protected final Predicate<? super T> ready;
protected final List<AttributeAndSensorCondition<?>>
abortSensorConditions;
+ protected List<Pair<AttributeAndSensorCondition<Object>,Duration>>
timeoutIfTimeoutSensorConditions = null;
protected final String blockingDetails;
protected final Function<? super T,? extends V> postProcess;
protected final Duration timeout;
@@ -276,6 +260,7 @@ public class DependentConfiguration {
this.sensor = builder.sensor;
this.ready = builder.readiness;
this.abortSensorConditions = builder.abortSensorConditions;
+ this.timeoutIfTimeoutSensorConditions =
builder.timeoutIfTimeoutSensorConditions;
this.blockingDetails = builder.blockingDetails;
this.postProcess = builder.postProcess;
this.timeout = builder.timeout;
@@ -293,6 +278,7 @@ public class DependentConfiguration {
this.blockingDetails = blockingDetails;
this.timeout = Duration.PRACTICALLY_FOREVER;
+ this.timeoutIfTimeoutSensorConditions = null;
this.onTimeout = Maybe.absent();
this.ignoreUnmanaged = DEFAULT_IGNORE_UNMANAGED;
this.onUnmanaged = Maybe.absent();
@@ -320,7 +306,12 @@ public class DependentConfiguration {
// return immediately if either the ready predicate or the abort
conditions hold
if (ready(value)) return postProcess(value);
-
+
+ if (Duration.ZERO.equals(timeout)) {
+ if (onTimeout.isPresent()) return onTimeout.get();
+ throw new RuntimeTimeoutException("Waiting not permitted");
+ }
+
final List<Exception> abortionExceptions =
Lists.newCopyOnWriteArrayList();
long start = System.currentTimeMillis();
@@ -343,7 +334,7 @@ public class DependentConfiguration {
final LinkedList<T> publishedValues = new LinkedList<T>();
final Semaphore semaphore = new Semaphore(0); // could use
Exchanger
SubscriptionHandle subscription = null;
- List<SubscriptionHandle> abortSubscriptions = Lists.newArrayList();
+ List<SubscriptionHandle> thisWaitSubscriptions =
Lists.newArrayList();
try {
subscription = entity.subscriptions().subscribe(source,
sensor, new SensorEventListener<T>() {
@@ -351,8 +342,9 @@ public class DependentConfiguration {
synchronized (publishedValues) {
publishedValues.add(event.getValue()); }
semaphore.release();
}});
+
for (final AttributeAndSensorCondition abortCondition :
abortSensorConditions) {
-
abortSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source,
abortCondition.sensor, new SensorEventListener<Object>() {
+
thisWaitSubscriptions.add(entity.subscriptions().subscribe(abortCondition.source,
abortCondition.sensor, new SensorEventListener<Object>() {
@Override public void onEvent(SensorEvent<Object>
event) {
if
(abortCondition.predicate.apply(event.getValue())) {
abortionExceptions.add(new Exception("Abort
due to "+abortCondition+": "+event.getValue()));
@@ -368,7 +360,36 @@ public class DependentConfiguration {
throw new CompoundRuntimeException("Aborted waiting for
ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
- CountdownTimer timer = timeout!=null ?
timeout.countdownTimer() : null;
+ CountdownTimer timer = timeout!=null ?
timeout.countdownTimer() : Duration.PRACTICALLY_FOREVER.countdownTimer();
+
+ Map<Integer,Duration> customTimeouts = MutableMap.of();
+ BiConsumer<Integer,Object> checkValueAtIndex = (index, val) ->
{
+ Pair<AttributeAndSensorCondition<Object>, Duration>
timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+ if (timeoutIfCondition.getLeft().predicate.apply(val)) {
+ if (!customTimeouts.containsKey(index)) {
+ // start timer from this point
+ customTimeouts.put(index,
timer.getDurationElapsed().add(timeoutIfCondition.getRight()));
+ }
+ } else {
+ customTimeouts.remove(index);
+ }
+ };
+
+ if (timeoutIfTimeoutSensorConditions!=null) {
+ for (int i=0; i<timeoutIfTimeoutSensorConditions.size();
i++) {
+ int index = i;
+ Pair<AttributeAndSensorCondition<Object>, Duration>
timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+
+
thisWaitSubscriptions.add(entity.subscriptions().subscribe(timeoutIfCondition.getLeft().source,
timeoutIfCondition.getLeft().sensor, new SensorEventListener<Object>() {
+ @Override public void onEvent(SensorEvent<Object>
event) {
+ checkValueAtIndex.accept(index,
event.getValue());
+ }}));
+
+ Object val =
timeoutIfCondition.getLeft().source.getAttribute(timeoutIfCondition.getLeft().sensor);
+ checkValueAtIndex.accept(index, val);
+ }
+ }
+
Duration maxPeriod = ValueResolver.PRETTY_QUICK_WAIT;
Duration nextPeriod = ValueResolver.REAL_QUICK_PERIOD;
while (true) {
@@ -418,6 +439,22 @@ public class DependentConfiguration {
throw new CompoundRuntimeException("Aborted waiting
for ready value from "+source+" "+sensor.getName(), abortionExceptions);
}
+ if (!customTimeouts.isEmpty()) {
+ for (Map.Entry<Integer, Duration> entry :
customTimeouts.entrySet()) {
+ Integer index = entry.getKey();
+ Duration specialTimeout = entry.getValue();
+ Pair<AttributeAndSensorCondition<Object>,
Duration> timeoutIfCondition = timeoutIfTimeoutSensorConditions.get(index);
+ if
(timer.getDurationElapsed().isLongerThan(specialTimeout)) {
+ Object val =
timeoutIfCondition.getLeft().source.getAttribute(timeoutIfCondition.getLeft().sensor);
+ if
(timeoutIfCondition.getLeft().predicate.apply(val)) {
+ if (onTimeout.isPresent()) continue;
+ throw new
RuntimeTimeoutException("Unsatisfied after " + Duration.sinceUtc(start) + "
(tighter timeout due to " +
+ timeoutIfCondition.getLeft() + ",
with value " + val + ")");
+ }
+ }
+ }
+ }
+
nextPeriod =
nextPeriod.multiply(1.2).upperBound(maxPeriod);
}
if (LOG.isDebugEnabled()) LOG.debug("Attribute-ready for {} in
entity {}", sensor, source);
@@ -428,7 +465,7 @@ public class DependentConfiguration {
if (subscription != null) {
entity.subscriptions().unsubscribe(subscription);
}
- for (SubscriptionHandle handle : abortSubscriptions) {
+ for (SubscriptionHandle handle : thisWaitSubscriptions) {
entity.subscriptions().unsubscribe(handle);
}
}
@@ -887,7 +924,7 @@ public class DependentConfiguration {
/**
* Will wait for the attribute on the given entity, with default
behaviour:
* If that entity reports {@link Lifecycle#ON_FIRE} for its {@link
Attributes#SERVICE_STATE_ACTUAL} then it will abort;
- * If that entity is stopping or destroyed (see {@link
Builder#timeoutIfStoppingOrDestroyed(Duration)}),
+ * If that entity is stopping or destroyed (see {@link
Builder#timeoutIfDown(Duration)}),
* then it will timeout after 1 minute.
*/
public <T2> Builder<T2,T2> attributeWhenReady(Entity source,
AttributeSensor<T2> sensor) {
@@ -923,6 +960,7 @@ public class DependentConfiguration {
protected Predicate<? super T> readiness;
protected Function<? super T, ? extends V> postProcess;
protected List<AttributeAndSensorCondition<?>> abortSensorConditions =
Lists.newArrayList();
+ protected List<Pair<AttributeAndSensorCondition<Object>,Duration>>
timeoutIfTimeoutSensorConditions = null;
protected String blockingDetails;
protected Duration timeout;
protected Maybe<V> onTimeout = Maybe.absent();
@@ -974,11 +1012,9 @@ public class DependentConfiguration {
return this;
}
/** Causes the depender to timeout after the given time if {@link
Attributes#SERVICE_STATE_ACTUAL}
- * is one of {@link Lifecycle#STOPPING}, {@link Lifecycle#STOPPED}, or
{@link Lifecycle#DESTROYED}. */
- public Builder<T,V> timeoutIfStoppingOrDestroyed(Duration time) {
- timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.equalTo(Lifecycle.STOPPING), time);
- timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.equalTo(Lifecycle.STOPPED), time);
- timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.equalTo(Lifecycle.DESTROYED), time);
+ * is not starting or running */
+ public Builder<T,V> timeoutIfDown(Duration time) {
+ timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.in(MutableList.of(Lifecycle.STOPPING, Lifecycle.STOPPED,
Lifecycle.DESTROYED, Lifecycle.ON_FIRE, Lifecycle.CREATED, null)), time);
return this;
}
public Builder<T,V> options(AttributeWhenReadyOptions options) {
@@ -986,11 +1022,12 @@ public class DependentConfiguration {
if (options.timeout!=null) {
timeout(options.timeout);
}
- if (options.timeoutIfOnFire!=null) {
- if (Duration.ZERO.equals(options.timeoutIfOnFire))
abortIfOnFire();
- else timeoutIf(source, Attributes.SERVICE_STATE_ACTUAL,
Predicates.equalTo(Lifecycle.ON_FIRE), options.timeoutIfOnFire);
+ if (Boolean.TRUE.equals(options.abort_if_on_fire))
abortIfOnFire();
+ if (options.timeout_if_down !=null)
timeoutIfDown(options.timeout_if_down);
+
+ if (!options.wait_for_truthy) {
+ readiness = Predicates.alwaysTrue();
}
- if (options.timeoutIfDown!=null)
timeoutIfStoppingOrDestroyed(options.timeoutIfDown);
}
return this;
}
@@ -1006,9 +1043,8 @@ public class DependentConfiguration {
}
/** specifies the supplied timeout if the condition is met */
public <T2> Builder<T,V> timeoutIf(Entity source, AttributeSensor<T2>
sensor, Predicate<? super T2> predicate, Duration val) {
- // TODO these only apply to the target entity's state at
initialization time;
- // it would be nice to store these and recheck periodically in
case state changes subsequently (either from down to up or up to down)
- if (predicate.apply(source.sensors().get(sensor))) timeout(val);
+ if (timeoutIfTimeoutSensorConditions==null)
timeoutIfTimeoutSensorConditions = MutableList.of();
+ timeoutIfTimeoutSensorConditions.add(Pair.of(new
AttributeAndSensorCondition(source, sensor, predicate), val));
return this;
}
public Builder<T,V> onTimeoutReturn(V val) {