Github user aledsage commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/835#discussion_r142923334 --- Diff: core/src/test/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManagerTest.java --- @@ -170,4 +181,84 @@ public void run() { if (threadException.get() != null) throw threadException.get(); } + @Test + // same test as in PolicySubscriptionTest, but for entities / simpler + public void testSubscriptionReceivesInitialValueEventsInOrder() { + RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>(); + + entity.sensors().set(TestEntity.NAME, "myname"); + entity.sensors().set(TestEntity.SEQUENCE, 123); + entity.sensors().emit(TestEntity.MY_NOTIF, -1); + + // delivery should be in subscription order, so 123 then 456 + entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SEQUENCE, listener); + // wait for the above delivery - otherwise it might get dropped + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { + Asserts.assertSize(listener.getEvents(), 1); }); + entity.sensors().set(TestEntity.SEQUENCE, 456); + + // notifications don't have "initial value" so don't get -1 + entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.MY_NOTIF, listener); + // but do get 1, after 456 + entity.sensors().emit(TestEntity.MY_NOTIF, 1); + + // STOPPING and myname received, in subscription order, after everything else + entity.sensors().set(TestEntity.SERVICE_STATE_ACTUAL, Lifecycle.STOPPING); + entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.SERVICE_STATE_ACTUAL, listener); + entity.subscriptions().subscribe(ImmutableMap.of("notifyOfInitialValue", true), entity, TestEntity.NAME, listener); + + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), new Runnable() { + @Override public void run() { + Asserts.assertEquals(listener.getEvents(), ImmutableList.of( + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 123), + new BasicSensorEvent<Integer>(TestEntity.SEQUENCE, entity, 456), + new BasicSensorEvent<Integer>(TestEntity.MY_NOTIF, entity, 1), + new BasicSensorEvent<Lifecycle>(TestEntity.SERVICE_STATE_ACTUAL, entity, Lifecycle.STOPPING), + new BasicSensorEvent<String>(TestEntity.NAME, entity, "myname")), + "actually got: "+listener.getEvents()); + }}); + } + + @Test + public void testNotificationOrderMatchesSetValueOrderWhenSynched() { + RecordingSensorEventListener<Object> listener = new RecordingSensorEventListener<>(); + + AtomicInteger count = new AtomicInteger(); + Runnable set = () -> { + synchronized (count) { + entity.sensors().set(TestEntity.SEQUENCE, count.incrementAndGet()); + } + }; + entity.subscriptions().subscribe(ImmutableMap.of(), entity, TestEntity.SEQUENCE, listener); + for (int i=0; i<10; i++) { + new Thread(set).start(); + } + + Asserts.succeedsEventually(MutableMap.of("timeout", Duration.seconds(5)), () -> { --- End diff -- As above, don't bother with bespoke timeouts. (Same below).
---