Repository: brooklyn-server Updated Branches: refs/heads/master adfa95d71 -> 0a1064f20
BROOKLYN-352: fix ConcurrentModification in getAllEntitiesInApplication Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/00c8f1c6 Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/00c8f1c6 Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/00c8f1c6 Branch: refs/heads/master Commit: 00c8f1c659b5a23e94f62ba023231beb07b5796e Parents: 880f842 Author: Aled Sage <aled.s...@gmail.com> Authored: Tue Nov 22 11:55:59 2016 +0000 Committer: Aled Sage <aled.s...@gmail.com> Committed: Tue Nov 22 11:55:59 2016 +0000 ---------------------------------------------------------------------- .../core/mgmt/internal/LocalEntityManager.java | 43 ++++++-- .../core/entity/proxying/EntityManagerTest.java | 108 +++++++++++++++++++ 2 files changed, 144 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00c8f1c6/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java index 6edecfc..3a9d756 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java @@ -19,6 +19,7 @@ package org.apache.brooklyn.core.mgmt.internal; import static com.google.common.base.Preconditions.checkNotNull; + import groovy.util.ObservableList; import java.lang.reflect.Proxy; @@ -73,8 +74,8 @@ import com.google.common.base.Function; import com.google.common.base.Preconditions; import com.google.common.base.Predicate; import com.google.common.base.Predicates; +import com.google.common.collect.FluentIterable; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.collect.Maps; @@ -212,13 +213,41 @@ public class LocalEntityManager implements EntityManagerInternal { @Override public Iterable<Entity> getAllEntitiesInApplication(Application application) { + // To fix https://issues.apache.org/jira/browse/BROOKLYN-352, we need to synchronize on + // preRegisteredEntitiesById and preManagedEntitiesById while iterating over them (because + // they are synchronizedMaps). entityProxiesById is a ConcurrentMap, so no need to + // synchronize on that. + // Only synchronize on one at a time, to avoid the risk of deadlock. + Predicate<Entity> predicate = EntityPredicates.applicationIdEqualTo(application.getId()); - Iterable<Entity> allentities = Iterables.concat(preRegisteredEntitiesById.values(), preManagedEntitiesById.values(), entityProxiesById.values()); - Iterable<Entity> result = Iterables.filter(allentities, predicate); - return ImmutableSet.copyOf(Iterables.transform(result, new Function<Entity, Entity>() { - @Override public Entity apply(Entity input) { - return Entities.proxy(input); - }})); + Set<Entity> result = Sets.newLinkedHashSet(); + + synchronized (preRegisteredEntitiesById) { + for (Entity entity : preRegisteredEntitiesById.values()) { + if (predicate.apply(entity)) { + result.add(entity); + } + } + } + synchronized (preManagedEntitiesById) { + for (Entity entity : preManagedEntitiesById.values()) { + if (predicate.apply(entity)) { + result.add(entity); + } + } + } + for (Entity entity : entityProxiesById.values()) { + if (predicate.apply(entity)) { + result.add(entity); + } + } + + return FluentIterable.from(result) + .transform(new Function<Entity, Entity>() { + @Override public Entity apply(Entity input) { + return Entities.proxy(input); + }}) + .toSet(); } @Override http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00c8f1c6/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java b/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java index 315bec4..7a667c3 100644 --- a/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java +++ b/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java @@ -22,9 +22,19 @@ import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; import static org.testng.Assert.assertTrue; +import java.util.ConcurrentModificationException; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + import org.apache.brooklyn.api.entity.EntitySpec; import org.apache.brooklyn.api.mgmt.EntityManager; +import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.factory.ApplicationBuilder; +import org.apache.brooklyn.core.mgmt.internal.LocalEntityManager; import org.apache.brooklyn.core.objs.proxy.EntityProxy; import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; import org.apache.brooklyn.core.test.entity.TestApplication; @@ -32,14 +42,24 @@ import org.apache.brooklyn.core.test.entity.TestEntity; import org.apache.brooklyn.core.test.entity.TestEntityImpl; import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.collections.MutableMap; +import org.apache.brooklyn.util.time.Duration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import com.google.common.base.Predicates; import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; +import com.google.common.util.concurrent.Futures; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.ListeningExecutorService; +import com.google.common.util.concurrent.MoreExecutors; public class EntityManagerTest extends BrooklynAppUnitTestSupport { + private static final Logger LOG = LoggerFactory.getLogger(EntityManagerTest.class); + private EntityManager entityManager; @BeforeMethod(alwaysRun=true) @@ -80,4 +100,92 @@ public class EntityManagerTest extends BrooklynAppUnitTestSupport { Asserts.assertEqualsIgnoringOrder(entityManager.findEntities(Predicates.instanceOf(TestApplication.class)), ImmutableList.of(app, app2)); Asserts.assertEqualsIgnoringOrder(entityManager.findEntitiesInApplication(app, Predicates.instanceOf(TestApplication.class)), ImmutableList.of(app)); } + + // See https://issues.apache.org/jira/browse/BROOKLYN-352 + // Before the fix, 250ms was sufficient to cause the ConcurrentModificationException + @Test + public void testGetAllEntitiesWhileEntitiesAddedAndRemoved() throws Exception { + runGetAllEntitiesWhileEntitiesAddedAndRemoved(Duration.millis(250)); + } + + @Test(groups="Integration") + public void testGetAllEntitiesWhileEntitiesAddedAndRemovedManyTimes() throws Exception { + runGetAllEntitiesWhileEntitiesAddedAndRemoved(Duration.seconds(10)); + } + + /** + * See https://issues.apache.org/jira/browse/BROOKLYN-352. + * + * Tests for a {@link ConcurrentModificationException} in + * {@link LocalEntityManager#getAllEntitiesInApplication(org.apache.brooklyn.api.entity.Application)} + * by running multiple threads which continually call that method, while also running multiple + * threads that add/remove entities (thus modifying the collections being inspected by + * {@code getAllEntitiesInApplication}. + */ + protected void runGetAllEntitiesWhileEntitiesAddedAndRemoved(Duration duration) throws Exception { + final int NUM_GETTER_THREADS = 10; + final int NUM_ENTITY_LIFECYCLE_THREADS = 10; + + final AtomicBoolean running = new AtomicBoolean(true); + List<ListenableFuture<?>> futures = Lists.newArrayList(); + ListeningExecutorService executor = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); + try { + for (int i = 0; i < NUM_GETTER_THREADS; i++) { + ListenableFuture<?> future = executor.submit(new Callable<Void>() { + public Void call() throws Exception { + int numCycles = 0; + try { + while (running.get()) { + ((LocalEntityManager)entityManager).getAllEntitiesInApplication(app); + numCycles++; + } + LOG.info("Executed getAllEntitiesInApplication " + numCycles + " times"); + return null; + } catch (Exception e) { + LOG.error("Error in task for getAllEntitiesInApplication, cycle " + numCycles, e); + throw e; + } + }}); + futures.add(future); + } + + for (int i = 0; i < NUM_ENTITY_LIFECYCLE_THREADS; i++) { + ListenableFuture<?> future = executor.submit(new Callable<Void>() { + public Void call() { + List<TestEntity> entities = Lists.newLinkedList(); + int numCycles = 0; + try { + while (running.get()) { + for (int i = 0; i < 10; i++) { + TestEntity entity = app.addChild(EntitySpec.create(TestEntity.class)); + entities.add(entity); + if (!running.get()) break; + } + for (int i = 0; i < 10; i++) { + Entities.unmanage(entities.remove(0)); + if (!running.get()) break; + } + numCycles++; + } + LOG.info("Executed add/remove children " + numCycles + " cycles (" + (numCycles*10) + " entities)"); + return null; + } catch (Exception e) { + LOG.error("Error in task for add/remove children, cycle " + numCycles, e); + throw e; + } + }}); + futures.add(future); + } + + try { + Futures.allAsList(futures).get(duration.toMilliseconds(), TimeUnit.MILLISECONDS); + } catch (TimeoutException e) { + // This is good; it means we're still running after 10 seconds with no exceptions + } + running.set(false); + Futures.allAsList(futures).get(Asserts.DEFAULT_LONG_TIMEOUT.toMilliseconds(), TimeUnit.MILLISECONDS); + } finally { + executor.shutdownNow(); + } + } }