subscriptions: support notifyOfInitialValue If pass in notifyOfInitialValue=true when subscribing to a single entity:attribute, then the listener will be called with the current value (rather than waiting for the first change).
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/7ee7d410 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/7ee7d410 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/7ee7d410 Branch: refs/heads/master Commit: 7ee7d41026d180aeb4cf9206f86b7b0a7eb263d4 Parents: 0d9b3a8 Author: Aled Sage <[email protected]> Authored: Fri Sep 18 19:55:29 2015 +0100 Committer: Aled Sage <[email protected]> Committed: Mon Sep 21 14:33:20 2015 +0100 ---------------------------------------------------------------------- .../apache/brooklyn/api/entity/EntityLocal.java | 13 +++++- .../brooklyn/api/mgmt/SubscriptionContext.java | 2 +- .../brooklyn/core/entity/AbstractEntity.java | 7 ++++ .../mgmt/internal/BasicSubscriptionContext.java | 4 +- .../mgmt/internal/LocalSubscriptionManager.java | 41 ++++++++++++++++++- .../core/mgmt/internal/SubscriptionTracker.java | 10 ++++- .../core/objs/AbstractEntityAdjunct.java | 8 ++++ .../brooklyn/enricher/stock/UpdatingMap.java | 4 +- .../core/entity/EntitySubscriptionTest.java | 43 ++++++++++++++++++++ .../policy/basic/PolicySubscriptionTest.java | 30 ++++++++++++++ 10 files changed, 153 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java b/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java index 5533949..7e5e963 100644 --- a/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java +++ b/api/src/main/java/org/apache/brooklyn/api/entity/EntityLocal.java @@ -115,7 +115,18 @@ public interface EntityLocal extends Entity { // FIXME remove from interface? @Beta <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener); - + + /** + * Allow us to subscribe to data from a {@link Sensor} on another entity. + * + * @return a subscription id which can be used to unsubscribe + * + * @see SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener) + */ + // FIXME remove from interface? + @Beta + <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener); + /** @see SubscriptionManager#subscribeToChildren(Map, Entity, Sensor, SensorEventListener) */ // FIXME remove from interface? @Beta http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java ---------------------------------------------------------------------- diff --git a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java index 7b4e6e7..3328b1a 100644 --- a/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java +++ b/api/src/main/java/org/apache/brooklyn/api/mgmt/SubscriptionContext.java @@ -34,7 +34,7 @@ public interface SubscriptionContext { /** * As {@link SubscriptionManager#subscribe(Map, Entity, Sensor, SensorEventListener)} with default subscription parameters for this context */ - <T> SubscriptionHandle subscribe(Map<String, Object> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener); + <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener); /** @see #subscribe(Map, Entity, Sensor, SensorEventListener) */ <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java index 5dc110d..03254dd 100644 --- a/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java +++ b/core/src/main/java/org/apache/brooklyn/core/entity/AbstractEntity.java @@ -1337,6 +1337,13 @@ public abstract class AbstractEntity extends AbstractBrooklynObject implements E return getSubscriptionTracker().subscribe(producer, sensor, listener); } + /** @see EntityLocal#subscribe */ + @Override + @Beta + public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { + return getSubscriptionTracker().subscribe(flags, producer, sensor, listener); + } + /** @see EntityLocal#subscribeToChildren */ @Override public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java index 5c38b81..d821c4e 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/BasicSubscriptionContext.java @@ -70,7 +70,7 @@ public class BasicSubscriptionContext implements SubscriptionContext { } @SuppressWarnings("rawtypes") - public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, Closure c) { + public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, Closure c) { return subscribe(newFlags, producer, sensor, toSensorEventListener(c)); } @@ -80,7 +80,7 @@ public class BasicSubscriptionContext implements SubscriptionContext { } @Override - public <T> SubscriptionHandle subscribe(Map<String, Object> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { + public <T> SubscriptionHandle subscribe(Map<String, ?> newFlags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { Map<String,Object> subscriptionFlags = Maps.newLinkedHashMap(flags); if (newFlags != null) subscriptionFlags.putAll(newFlags); return manager.subscribe(subscriptionFlags, producer, sensor, listener); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java index 6ea94a1..7743995 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java @@ -37,10 +37,13 @@ import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.mgmt.ExecutionManager; import org.apache.brooklyn.api.mgmt.SubscriptionHandle; import org.apache.brooklyn.api.mgmt.SubscriptionManager; +import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.api.sensor.SensorEvent; import org.apache.brooklyn.api.sensor.SensorEventListener; import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.sensor.BasicSensorEvent; +import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.task.BasicExecutionManager; import org.apache.brooklyn.util.core.task.SingleThreadedScheduler; import org.apache.brooklyn.util.text.Identifiers; @@ -90,7 +93,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager { } @SuppressWarnings("unchecked") - protected synchronized <T> SubscriptionHandle subscribe(Map<String, Object> flags, Subscription<T> s) { + protected synchronized <T> SubscriptionHandle subscribe(Map<String, Object> flags, final Subscription<T> s) { Entity producer = s.producer; Sensor<T> sensor= s.sensor; s.subscriber = getSubscriber(flags, s); @@ -105,6 +108,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager { s.subscriberExecutionManagerTagSupplied = false; } s.eventFilter = (Predicate<SensorEvent<T>>) flags.remove("eventFilter"); + boolean notifyOfInitialValue = Boolean.TRUE.equals(flags.remove("notifyOfInitialValue")); s.flags = flags; if (LOG.isDebugEnabled()) LOG.debug("Creating subscription {} for {} on {} {} in {}", new Object[] {s.id, s.subscriber, producer, sensor, this}); @@ -116,6 +120,41 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager { if (!s.subscriberExecutionManagerTagSupplied && s.subscriberExecutionManagerTag!=null) { ((BasicExecutionManager) em).setTaskSchedulerForTag(s.subscriberExecutionManagerTag, SingleThreadedScheduler.class); } + + if (notifyOfInitialValue) { + if (producer == null) { + LOG.warn("Cannot notifyOfInitialValue for subscription with wildcard producer: "+s); + } else if (sensor == null) { + LOG.warn("Cannot notifyOfInitialValue for subscription with wilcard sensor: "+s); + } else if (!(sensor instanceof AttributeSensor)) { + LOG.warn("Cannot notifyOfInitialValue for subscription with non-attribute sensor: "+s); + } else { + if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s}); + Map<String, Object> tagsMap = MutableMap.of("tag", s.subscriberExecutionManagerTag); + em.submit(tagsMap, new Runnable() { + @Override + public String toString() { + return "LSM.publishInitialValue("+s.producer+", "+s.sensor+")"; + } + public void run() { + Object val = s.producer.getAttribute((AttributeSensor<?>) s.sensor); + @SuppressWarnings("rawtypes") // TODO s.listener.onEvent gives compilation error if try to use <T> + SensorEvent event = new BasicSensorEvent(s.sensor, s.producer, val); + if (s.eventFilter!=null && !s.eventFilter.apply(event)) + return; + try { + s.listener.onEvent(event); + } catch (Throwable t) { + if (event!=null && event.getSource()!=null && Entities.isNoLongerManaged(event.getSource())) { + LOG.debug("Error processing initial-value subscription to "+LocalSubscriptionManager.this+", after entity unmanaged: "+t, t); + } else { + LOG.warn("Error processing initial-value subscription to "+LocalSubscriptionManager.this+": "+t, t); + } + } + }}); + } + } + return s; } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java index 2faad3a..3d5793c 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/SubscriptionTracker.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.core.mgmt.internal; import java.util.Collection; +import java.util.Map; import org.apache.brooklyn.api.entity.Entity; import org.apache.brooklyn.api.entity.Group; @@ -29,6 +30,7 @@ import org.apache.brooklyn.api.sensor.SensorEventListener; import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.SetMultimap; /** @@ -57,13 +59,17 @@ public class SubscriptionTracker { /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */ public <T> SubscriptionHandle subscribe(Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { - SubscriptionHandle handle = context.subscribe(producer, sensor, listener); + return subscribe(ImmutableMap.<String, Object>of(), producer, sensor, listener); + } + + public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { + SubscriptionHandle handle = context.subscribe(flags, producer, sensor, listener); synchronized (subscriptions) { subscriptions.put(producer, handle); } return handle; } - + /** @see SubscriptionContext#subscribeToChildren(Entity, Sensor, SensorEventListener) */ public <T> SubscriptionHandle subscribeToChildren(Entity parent, Sensor<T> sensor, SensorEventListener<? super T> listener) { SubscriptionHandle handle = context.subscribeToChildren(parent, sensor, listener); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java index e85cc73..fb71901 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/AbstractEntityAdjunct.java @@ -392,6 +392,14 @@ public abstract class AbstractEntityAdjunct extends AbstractBrooklynObject imple } @VisibleForTesting //intended as protected, meant for subclasses + @Beta + /** @see SubscriptionContext#subscribe(Map, Entity, Sensor, SensorEventListener) */ + public <T> SubscriptionHandle subscribe(Map<String, ?> flags, Entity producer, Sensor<T> sensor, SensorEventListener<? super T> listener) { + if (!checkCanSubscribe()) return null; + return getSubscriptionTracker().subscribe(flags, producer, sensor, listener); + } + + @VisibleForTesting //intended as protected, meant for subclasses /** @see SubscriptionContext#subscribe(Entity, Sensor, SensorEventListener) */ public <T> SubscriptionHandle subscribeToMembers(Group producerGroup, Sensor<T> sensor, SensorEventListener<? super T> listener) { if (!checkCanSubscribe(producerGroup)) return null; http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java index b09b6d6..43aec92 100644 --- a/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java +++ b/core/src/main/java/org/apache/brooklyn/enricher/stock/UpdatingMap.java @@ -36,6 +36,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.google.common.base.Function; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Maps; import com.google.common.reflect.TypeToken; @@ -102,8 +103,7 @@ public class UpdatingMap<S,TKey,TVal> extends AbstractEnricher implements Sensor this.computing = (Function) getRequiredConfig(COMPUTING); this.removingIfResultIsNull = getConfig(REMOVING_IF_RESULT_IS_NULL); - subscribe(entity, sourceSensor, this); - onUpdated(); + subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, sourceSensor, this); } @Override http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java index 8b8d244..620d8e0 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/EntitySubscriptionTest.java @@ -34,12 +34,15 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; public class EntitySubscriptionTest { // TODO Duplication between this and PolicySubscriptionTest + private static final long SHORT_WAIT_MS = 100; + private SimulatedLocation loc; private TestApplication app; private TestEntity entity; @@ -237,4 +240,44 @@ public class EntitySubscriptionTest { } }}); } + + @Test + public void testSubscriptionReceivesInitialValueEvents() { + observedEntity.sensors().set(TestEntity.SEQUENCE, 123); + observedEntity.sensors().set(TestEntity.NAME, "myname"); + + entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.SEQUENCE, listener); + entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.NAME, listener); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, observedEntity, 123), + new BasicSensorEvent<String>(TestEntity.NAME, observedEntity, "myname"))); + }}); + } + + + @Test + public void testSubscriptionNotReceivesInitialValueEventsByDefault() { + observedEntity.sensors().set(TestEntity.SEQUENCE, 123); + observedEntity.sensors().set(TestEntity.NAME, "myname"); + + entity.subscribe(observedEntity, TestEntity.SEQUENCE, listener); + entity.subscribe(observedEntity, TestEntity.NAME, listener); + + Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of()); + }}); + } + + // TODO A visual inspection test that we get a log.warn telling us we can't get the initial-value + @Test + public void testSubscriptionForInitialValueWhenNotValid() { + entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, TestEntity.MY_NOTIF, listener); + entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), observedEntity, null, listener); + entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, TestEntity.NAME, listener); + entity.subscribe(ImmutableMap.of("notifyOfInitialValue", true), null, null, listener); + } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/7ee7d410/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java index fa7333f..ab5bc4a 100644 --- a/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/policy/basic/PolicySubscriptionTest.java @@ -33,6 +33,7 @@ import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport { @@ -119,5 +120,34 @@ public class PolicySubscriptionTest extends BrooklynAppUnitTestSupport { new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, otherEntity, 456))); }}); } + + @Test + public void testSubscriptionReceivesInitialValueEvents() { + entity.sensors().set(TestEntity.SEQUENCE, 123); + entity.sensors().set(TestEntity.NAME, "myname"); + + policy.subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener); + policy.subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener); + + Asserts.succeedsEventually(new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123), + new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname"))); + }}); + } + @Test + public void testSubscriptionNotReceivesInitialValueEventsByDefault() { + entity.sensors().set(TestEntity.SEQUENCE, 123); + entity.sensors().set(TestEntity.NAME, "myname"); + + policy.subscribe(entity, TestEntity.SEQUENCE, listener); + policy.subscribe(entity, TestEntity.NAME, listener); + + Asserts.succeedsContinually(ImmutableMap.of("timeout", SHORT_WAIT_MS), new Runnable() { + @Override public void run() { + assertEquals(listener.getEvents(), ImmutableList.of()); + }}); + } }
