Issue 327: fix ConcurrentModificationException in subscriptions Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/cfce4732 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/cfce4732 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/cfce4732
Branch: refs/heads/0.4.0 Commit: cfce47322c63fcf023b90223de36a9e3537527d6 Parents: 114e99f Author: Aled Sage <[email protected]> Authored: Thu Sep 27 23:34:44 2012 +0100 Committer: Aled Sage <[email protected]> Committed: Thu Sep 27 23:34:44 2012 +0100 ---------------------------------------------------------------------- .../internal/LocalSubscriptionManager.java | 2 +- .../LocalSubscriptionManagerTest.groovy | 91 ----------- .../internal/LocalSubscriptionManagerTest.java | 154 +++++++++++++++++++ 3 files changed, 155 insertions(+), 92 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java b/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java index cfe49e8..7d48850 100644 --- a/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java +++ b/core/src/main/java/brooklyn/management/internal/LocalSubscriptionManager.java @@ -207,7 +207,7 @@ public class LocalSubscriptionManager implements SubscriptionManager { return (Set<SubscriptionHandle>) ((Set<?>) elvis(subscriptionsBySubscriber.get(subscriber), Collections.emptySet())); } - public Set<SubscriptionHandle> getSubscriptionsForEntitySensor(Entity source, Sensor<?> sensor) { + public synchronized Set<SubscriptionHandle> getSubscriptionsForEntitySensor(Entity source, Sensor<?> sensor) { Set<SubscriptionHandle> subscriptions = new LinkedHashSet<SubscriptionHandle>(); subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(source, sensor)), Collections.emptySet())); subscriptions.addAll(elvis(subscriptionsByToken.get(makeEntitySensorToken(null, sensor)), Collections.emptySet())); http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy deleted file mode 100644 index d64d368..0000000 --- a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.groovy +++ /dev/null @@ -1,91 +0,0 @@ -package brooklyn.management.internal; - -import static org.testng.Assert.* - -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import org.testng.annotations.Test - -import brooklyn.entity.basic.AbstractGroup -import brooklyn.event.SensorEvent -import brooklyn.event.SensorEventListener -import brooklyn.test.entity.TestApplication -import brooklyn.test.entity.TestEntity - -/** - * testing the {@link SubscriptionManager} and associated classes. - */ -public class LocalSubscriptionManagerTest { - - private static final int TIMEOUT_MS = 5000; - - @Test - public void testSubscribeToEntityAttributeChange() { - TestApplication app = new TestApplication() - TestEntity entity = new TestEntity([owner:app]) - CountDownLatch latch = new CountDownLatch(1) - app.subscribe(entity, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener) - entity.setSequenceValue(1234) - if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { - fail "Timeout waiting for Event on TestEntity listener" - } - } - - @Test - public void testSubscribeToEntityWithAttributeWildcard() { - TestApplication app = new TestApplication() - TestEntity entity = new TestEntity([owner:app]) - CountDownLatch latch = new CountDownLatch(1) - app.subscribe(entity, null, { latch.countDown() } as SensorEventListener) - entity.setSequenceValue(1234) - if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { - fail "Timeout waiting for Event on TestEntity listener" - } - } - - @Test - public void testSubscribeToAttributeChangeWithEntityWildcard() { - TestApplication app = new TestApplication() - TestEntity entity = new TestEntity([owner:app]) - CountDownLatch latch = new CountDownLatch(1) - app.subscribe(null, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener) - entity.setSequenceValue(1234) - if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { - fail "Timeout waiting for Event on TestEntity listener" - } - } - - @Test - public void testSubscribeToChildAttributeChange() { - TestApplication app = new TestApplication() - TestEntity child = new TestEntity([owner:app]) - CountDownLatch latch = new CountDownLatch(1) - app.subscribeToChildren(app, TestEntity.SEQUENCE, { latch.countDown() } as SensorEventListener) - child.setSequenceValue(1234) - if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { - fail "Timeout waiting for Event on child TestEntity listener" - } - } - - @Test - public void testSubscribeToMemberAttributeChange() { - TestApplication app = new TestApplication() - AbstractGroup group = new AbstractGroup([owner:app]) {} - TestEntity member = new TestEntity([owner:app]) - group.addMember(member); - - List<SensorEvent<Integer>> events = [] - CountDownLatch latch = new CountDownLatch(1) - app.subscribeToMembers(group, TestEntity.SEQUENCE, { events.add(it); latch.countDown() } as SensorEventListener) - member.setAttribute(TestEntity.SEQUENCE, 123) - - if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { - fail "Timeout waiting for Event on parent TestEntity listener" - } - assertEquals(events.size(), 1) - assertEquals(events.getAt(0).value, 123) - assertEquals(events.getAt(0).sensor, TestEntity.SEQUENCE) - assertEquals(events.getAt(0).source.id, member.id) - } -} http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/cfce4732/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java new file mode 100644 index 0000000..8eda1d8 --- /dev/null +++ b/core/src/test/java/brooklyn/management/internal/LocalSubscriptionManagerTest.java @@ -0,0 +1,154 @@ +package brooklyn.management.internal; + +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import brooklyn.entity.basic.BasicGroup; +import brooklyn.event.SensorEvent; +import brooklyn.event.SensorEventListener; +import brooklyn.management.SubscriptionHandle; +import brooklyn.management.SubscriptionManager; +import brooklyn.test.entity.TestApplication; +import brooklyn.test.entity.TestEntity; + +/** + * testing the {@link SubscriptionManager} and associated classes. + */ +public class LocalSubscriptionManagerTest { + + private static final int TIMEOUT_MS = 5000; + + private TestApplication app; + + @BeforeMethod + public void setUp() throws Exception { + app = new TestApplication(); + } + + @Test + public void testSubscribeToEntityAttributeChange() throws Exception { + TestEntity entity = new TestEntity(app); + final CountDownLatch latch = new CountDownLatch(1); + app.subscribe(entity, TestEntity.SEQUENCE, new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + latch.countDown(); + }}); + entity.setSequenceValue(1234); + if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + fail("Timeout waiting for Event on TestEntity listener"); + } + } + + @Test + public void testSubscribeToEntityWithAttributeWildcard() throws Exception { + TestEntity entity = new TestEntity(app); + final CountDownLatch latch = new CountDownLatch(1); + app.subscribe(entity, null, new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + latch.countDown(); + }}); + entity.setSequenceValue(1234); + if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + fail("Timeout waiting for Event on TestEntity listener"); + } + } + + @Test + public void testSubscribeToAttributeChangeWithEntityWildcard() throws Exception { + TestEntity entity = new TestEntity(app); + final CountDownLatch latch = new CountDownLatch(1); + app.subscribe(null, TestEntity.SEQUENCE, new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + latch.countDown(); + }}); + entity.setSequenceValue(1234); + if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + fail("Timeout waiting for Event on TestEntity listener"); + } + } + + @Test + public void testSubscribeToChildAttributeChange() throws Exception { + TestEntity child = new TestEntity(app); + final CountDownLatch latch = new CountDownLatch(1); + app.subscribeToChildren(app, TestEntity.SEQUENCE, new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + latch.countDown(); + }}); + child.setSequenceValue(1234); + if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + fail("Timeout waiting for Event on child TestEntity listener"); + } + } + + @Test + public void testSubscribeToMemberAttributeChange() throws Exception { + BasicGroup group = new BasicGroup(app); + TestEntity member = new TestEntity(app); + group.addMember(member); + + final List<SensorEvent<Integer>> events = new CopyOnWriteArrayList<SensorEvent<Integer>>(); + final CountDownLatch latch = new CountDownLatch(1); + app.subscribeToMembers(group, TestEntity.SEQUENCE, new SensorEventListener<Integer>() { + @Override public void onEvent(SensorEvent<Integer> event) { + events.add(event); + latch.countDown(); + }}); + member.setAttribute(TestEntity.SEQUENCE, 123); + + if (!latch.await(TIMEOUT_MS, TimeUnit.MILLISECONDS)) { + fail("Timeout waiting for Event on parent TestEntity listener"); + } + assertEquals(events.size(), 1); + assertEquals(events.get(0).getValue(), (Integer)123); + assertEquals(events.get(0).getSensor(), TestEntity.SEQUENCE); + assertEquals(events.get(0).getSource().getId(), member.getId()); + } + + // Regression test for ConcurrentModificationException in issue #327 + @Test(groups="Integration") + public void testConcurrentSubscribingAndPublishing() throws Exception { + final AtomicReference<Exception> threadException = new AtomicReference<Exception>(); + TestEntity entity = new TestEntity(app); + + // Repeatedly subscribe and unsubscribe, so listener-set constantly changing while publishing to it. + // First create a stable listener so it is always the same listener-set object. + Thread thread = new Thread() { + public void run() { + try { + SensorEventListener<Object> noopListener = new SensorEventListener<Object>() { + @Override public void onEvent(SensorEvent<Object> event) { + } + }; + app.getSubscriptionContext().subscribe(null, TestEntity.SEQUENCE, noopListener); + while (!Thread.currentThread().isInterrupted()) { + SubscriptionHandle handle = app.getSubscriptionContext().subscribe(null, TestEntity.SEQUENCE, noopListener); + app.getSubscriptionContext().unsubscribe(handle); + } + } catch (Exception e) { + threadException.set(e); + } + } + }; + + try { + thread.start(); + for (int i = 0; i < 10000; i++) { + entity.setAttribute(TestEntity.SEQUENCE, i); + } + } finally { + thread.interrupt(); + } + + if (threadException.get() != null) throw threadException.get(); + } +}
