ServiceFailureDetector yaml tests, and fix
Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/c856cd15 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/c856cd15 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/c856cd15 Branch: refs/heads/master Commit: c856cd15112c046ae5f0c8b578b84a213b51bcc0 Parents: bcb9689 Author: Aled Sage <aled.s...@gmail.com> Authored: Wed Nov 16 09:51:43 2016 +0000 Committer: Aled Sage <aled.s...@gmail.com> Committed: Mon Nov 28 21:11:48 2016 +0000 ---------------------------------------------------------------------- .../ServiceFailureDetectorYamlTest.java | 189 +++++++++++++++++++ .../core/sensor/SensorEventPredicates.java | 81 ++++++++ .../entity/RecordingSensorEventListener.java | 23 +++ .../policy/ha/ServiceFailureDetector.java | 63 ++++--- 4 files changed, 329 insertions(+), 27 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c856cd15/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ServiceFailureDetectorYamlTest.java ---------------------------------------------------------------------- diff --git a/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ServiceFailureDetectorYamlTest.java b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ServiceFailureDetectorYamlTest.java new file mode 100644 index 0000000..4374c2b --- /dev/null +++ b/camp/camp-brooklyn/src/test/java/org/apache/brooklyn/camp/brooklyn/ServiceFailureDetectorYamlTest.java @@ -0,0 +1,189 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.camp.brooklyn; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertNotNull; + +import java.util.Map; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.sensor.Enricher; +import org.apache.brooklyn.config.ConfigKey; +import org.apache.brooklyn.core.entity.RecordingSensorEventListener; +import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ServiceNotUpLogic; +import org.apache.brooklyn.core.sensor.SensorEventPredicates; +import org.apache.brooklyn.core.test.entity.TestEntity; +import org.apache.brooklyn.policy.ha.HASensors; +import org.apache.brooklyn.policy.ha.ServiceFailureDetector; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import com.google.common.base.Joiner; +import com.google.common.base.Predicates; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Iterables; + +@Test +public class ServiceFailureDetectorYamlTest extends AbstractYamlTest { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(ServiceFailureDetectorYamlTest.class); + + static final String INDICATOR_KEY_1 = "test-indicator-1"; + + static final String appId = "my-app"; + static final String appVersion = "1.2.3"; + static final String appVersionedId = appId + ":" + appVersion; + + static final String catalogYamlSimple = Joiner.on("\n").join( + "brooklyn.catalog:", + " id: " + appId, + " version: " + appVersion, + " itemType: entity", + " item:", + " type: " + TestEntity.class.getName(), + " brooklyn.enrichers:", + " - type: " + ServiceFailureDetector.class.getName()); + + static final String catalogYamlWithDsl = Joiner.on("\n").join( + "brooklyn.catalog:", + " id: my-app", + " version: 1.2.3", + " itemType: entity", + " item:", + " services:", + " - type: " + TestEntity.class.getName(),//FailingEntity.class.getName(), + " brooklyn.parameters:", + " - name: custom.stabilizationDelay", + " type: " + Duration.class.getName(), + " default: 1ms", + " - name: custom.republishTime", + " type: " + Duration.class.getName(), + " default: 1m", + " brooklyn.enrichers:", + " - type: " + ServiceFailureDetector.class.getName(), + " brooklyn.config:", + " serviceOnFire.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")", + " entityFailed.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")", + " entityRecovered.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")", + " entityFailed.republishTime: $brooklyn:config(\"custom.republishTime\")"); + + static final String catalogYamlWithDslReferenceParentDefault = Joiner.on("\n").join( + "brooklyn.catalog:", + " id: my-app", + " version: 1.2.3", + " itemType: entity", + " item:", + " brooklyn.parameters:", + " - name: custom.stabilizationDelay", + " type: " + Duration.class.getName(), + " default: 1ms", + " - name: custom.republishTime", + " type: " + Duration.class.getName(), + " default: 1m", + " services:", + " - type: " + TestEntity.class.getName(),//FailingEntity.class.getName(), + " brooklyn.enrichers:", + " - type: " + ServiceFailureDetector.class.getName(), + " brooklyn.config:", + " serviceOnFire.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")", + " entityFailed.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")", + " entityRecovered.stabilizationDelay: $brooklyn:config(\"custom.stabilizationDelay\")", + " entityFailed.republishTime: $brooklyn:config(\"custom.republishTime\")"); + + @Test + public void testDefaults() throws Exception { + runTest(catalogYamlSimple, appVersionedId); + } + + @Test + public void testWithDslConfig() throws Exception { + Entity app = runTest(catalogYamlWithDsl, appVersionedId); + + TestEntity newEntity = (TestEntity) Iterables.getOnlyElement(app.getChildren()); + ServiceFailureDetector newEnricher = assertHasEnricher(newEntity, ServiceFailureDetector.class); + assertEnricherConfigMatchesDsl(newEnricher); + } + + @Test + public void testWithDslConfigReferenceParentDefault() throws Exception { + Entity app = runTest(catalogYamlWithDslReferenceParentDefault, appVersionedId); + + TestEntity newEntity = (TestEntity) Iterables.getOnlyElement(app.getChildren()); + ServiceFailureDetector newEnricher = assertHasEnricher(newEntity, ServiceFailureDetector.class); + assertEnricherConfigMatchesDsl(newEnricher); + } + + protected Entity runTest(String catalogYaml, String appId) throws Exception { + addCatalogItems(catalogYaml); + + String appYaml = Joiner.on("\n").join( + "services:", + "- type: " + appId); + Entity app = createStartWaitAndLogApplication(appYaml); + TestEntity entity = (TestEntity) Iterables.getOnlyElement(app.getChildren()); + assertHasEnricher(entity, ServiceFailureDetector.class); + + // Confirm ServiceFailureDetector triggers event + RecordingSensorEventListener<Object> listener = subscribeToHaSensors(entity); + + ServiceNotUpLogic.updateNotUpIndicator(entity, INDICATOR_KEY_1, "Simulate a problem"); + listener.assertHasEventEventually(SensorEventPredicates.sensorEqualTo(HASensors.ENTITY_FAILED)); + listener.assertEventCount(1); + listener.clearEvents(); + + ServiceNotUpLogic.clearNotUpIndicator(entity, INDICATOR_KEY_1); + listener.assertHasEventEventually(SensorEventPredicates.sensorEqualTo(HASensors.ENTITY_RECOVERED)); + listener.assertEventCount(1); + + return app; + } + + protected static <T extends Enricher> T assertHasEnricher(Entity entity, Class<T> enricherClazz) { + Enricher enricher = Iterables.find(entity.enrichers(), Predicates.instanceOf(enricherClazz)); + assertNotNull(enricher); + return enricherClazz.cast(enricher); + } + + protected static void assertEnricherConfig(Enricher enricher, Map<? extends ConfigKey<?>, ?> expectedVals) { + for (Map.Entry<? extends ConfigKey<?>, ?> entry : expectedVals.entrySet()) { + ConfigKey<?> key = entry.getKey(); + Object actual = enricher.config().get(key); + assertEquals(actual, entry.getValue(), "key="+key+"; expected="+entry.getValue()+"; actual="+actual); + } + } + + protected static void assertEnricherConfigMatchesDsl(Enricher enricher) { + assertEnricherConfig(enricher, ImmutableMap.<ConfigKey<?>, Object>of( + ServiceFailureDetector.ENTITY_FAILED_STABILIZATION_DELAY, Duration.ONE_MILLISECOND, + ServiceFailureDetector.SERVICE_ON_FIRE_STABILIZATION_DELAY, Duration.ONE_MILLISECOND, + ServiceFailureDetector.ENTITY_RECOVERED_STABILIZATION_DELAY, Duration.ONE_MILLISECOND, + ServiceFailureDetector.ENTITY_FAILED_REPUBLISH_TIME, Duration.ONE_MINUTE)); + } + + protected static RecordingSensorEventListener<Object> subscribeToHaSensors(Entity entity) { + RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>(); + entity.subscriptions().subscribe(entity, HASensors.ENTITY_RECOVERED, listener); + entity.subscriptions().subscribe(entity, HASensors.ENTITY_FAILED, listener); + return listener; + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c856cd15/core/src/main/java/org/apache/brooklyn/core/sensor/SensorEventPredicates.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/sensor/SensorEventPredicates.java b/core/src/main/java/org/apache/brooklyn/core/sensor/SensorEventPredicates.java new file mode 100644 index 0000000..00eb996 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/sensor/SensorEventPredicates.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.sensor; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.sensor.Sensor; +import org.apache.brooklyn.api.sensor.SensorEvent; +import org.apache.brooklyn.util.guava.SerializablePredicate; + +import com.google.common.base.Predicate; +import com.google.common.base.Predicates; + +public class SensorEventPredicates { + + public static Predicate<SensorEvent<?>> sensorEqualTo(final Sensor<?> val) { + return sensorSatisfies(Predicates.<Sensor<?>>equalTo(val)); + } + + public static Predicate<SensorEvent<?>> sensorSatisfies(final Predicate<? super Sensor<?>> condition) { + return new SensorSatisfies(condition); + } + + protected static class SensorSatisfies implements SerializablePredicate<SensorEvent<?>> { + private static final long serialVersionUID = -3585200249520308941L; + + protected final Predicate<? super Sensor<?>> condition; + protected SensorSatisfies(Predicate<? super Sensor<?>> condition) { + this.condition = condition; + } + @Override + public boolean apply(@Nullable SensorEvent<?> input) { + return (input != null) && condition.apply(input.getSensor()); + } + @Override + public String toString() { + return "sensorSatisfies("+condition+")"; + } + } + + public static <T> Predicate<SensorEvent<T>> valueEqualTo(final T val) { + return valueSatisfies(Predicates.equalTo(val)); + } + + public static <T> Predicate<SensorEvent<T>> valueSatisfies(final Predicate<? super T> condition) { + return new ValueSatisfies<T>(condition); + } + + protected static class ValueSatisfies<T> implements SerializablePredicate<SensorEvent<T>> { + private static final long serialVersionUID = 2805443606039228221L; + + protected final Predicate<? super T> condition; + protected ValueSatisfies(Predicate<? super T> condition) { + this.condition = condition; + } + @Override + public boolean apply(@Nullable SensorEvent<T> input) { + return (input != null) && condition.apply(input.getValue()); + } + @Override + public String toString() { + return "valueSatisfies("+condition+")"; + } + } +} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c856cd15/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java index e0c4ed0..c54a0fb 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/RecordingSensorEventListener.java @@ -28,10 +28,13 @@ import java.util.Objects; import org.apache.brooklyn.api.mgmt.Task; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; +import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.core.task.Tasks; +import org.testng.Assert; import com.google.common.base.Function; +import com.google.common.base.Predicate; import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; import com.google.common.collect.Lists; @@ -98,6 +101,26 @@ public class RecordingSensorEventListener<T> implements SensorEventListener<T>, .transform(new GetValueFunction<T>()); } + public void assertHasEvent(Predicate<? super SensorEvent<T>> filter) { + for (SensorEvent<T> event : events) { + if (filter.apply(event)) { + return; + } + } + Assert.fail("No event matching filter "+ filter); + } + + public void assertHasEventEventually(final Predicate<? super SensorEvent<T>> filter) { + Asserts.succeedsEventually(new Runnable() { + public void run() { + assertHasEvent(filter); + }}); + } + + public void assertEventCount(int expected) { + Assert.assertEquals(events.size(), expected, "events="+events); + } + /** * Clears all events recorded by the listener. */ http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/c856cd15/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java ---------------------------------------------------------------------- diff --git a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java index d36f25a..5c5aaeb 100644 --- a/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java +++ b/policy/src/main/java/org/apache/brooklyn/policy/ha/ServiceFailureDetector.java @@ -301,37 +301,46 @@ public class ServiceFailureDetector extends ServiceStateLogic.ComputeServiceStat return description; } - @SuppressWarnings({ "unchecked", "rawtypes" }) + @SuppressWarnings({ "rawtypes" }) protected void recomputeAfterDelay(long delay) { - if (isRunning() && executorQueued.compareAndSet(false, true)) { - long now = System.currentTimeMillis(); - delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now)); - if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay); - - Runnable job = new Runnable() { - @Override public void run() { - try { - executorTime = System.currentTimeMillis(); - executorQueued.set(false); + // TODO Execute in same thread as other onEvent calls are done in (i.e. same conceptually + // single-threaded executor as the subscription-manager will use). + // + // TODO Disabling the use of executorQueued check - it was causing assertions to fail that + // we'd triggered the ENTITY_FAILED/ENTITY_RECOVERED. Previously used: + // if (executorQueued.compareAndSet(false, true)) { + // My guess is that the next call to onEvent() didn't always call recomputeAfterDelay with + // the recalculated desired delay, as desired by the skipped call. But not sure why. + + if (!isRunning()) return; + + long now = System.currentTimeMillis(); + delay = Math.max(0, Math.max(delay, (executorTime + MIN_PERIOD_BETWEEN_EXECS_MILLIS) - now)); + if (LOG.isTraceEnabled()) LOG.trace("{} scheduling publish in {}ms", this, delay); + + Runnable job = new Runnable() { + @Override public void run() { + try { + executorTime = System.currentTimeMillis(); + executorQueued.set(false); - onEvent(null); - - } catch (Exception e) { - if (isRunning()) { - LOG.error("Error in enricher "+this+": "+e, e); - } else { - if (LOG.isDebugEnabled()) LOG.debug("Error in enricher "+this+" (but no longer running): "+e, e); - } - } catch (Throwable t) { - LOG.error("Error in enricher "+this+": "+t, t); - throw Exceptions.propagate(t); + onEvent(null); + + } catch (Exception e) { + if (isRunning()) { + LOG.error("Error in enricher "+this+": "+e, e); + } else { + if (LOG.isDebugEnabled()) LOG.debug("Error in enricher "+this+" (but no longer running): "+e, e); } + } catch (Throwable t) { + LOG.error("Error in enricher "+this+": "+t, t); + throw Exceptions.propagate(t); } - }; - - ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job)); - ((EntityInternal)entity).getExecutionContext().submit(task); - } + } + }; + + ScheduledTask task = new ScheduledTask(MutableMap.of("delay", Duration.of(delay, TimeUnit.MILLISECONDS)), new BasicTask(job)); + ((EntityInternal)entity).getExecutionContext().submit(task); } private String getTimeStringSince(Long time) {