Repository: brooklyn-server
Updated Branches:
  refs/heads/master adfa95d71 -> 0a1064f20


BROOKLYN-352: fix ConcurrentModification in getAllEntitiesInApplication

Project: http://git-wip-us.apache.org/repos/asf/brooklyn-server/repo
Commit: http://git-wip-us.apache.org/repos/asf/brooklyn-server/commit/00c8f1c6
Tree: http://git-wip-us.apache.org/repos/asf/brooklyn-server/tree/00c8f1c6
Diff: http://git-wip-us.apache.org/repos/asf/brooklyn-server/diff/00c8f1c6

Branch: refs/heads/master
Commit: 00c8f1c659b5a23e94f62ba023231beb07b5796e
Parents: 880f842
Author: Aled Sage <aled.s...@gmail.com>
Authored: Tue Nov 22 11:55:59 2016 +0000
Committer: Aled Sage <aled.s...@gmail.com>
Committed: Tue Nov 22 11:55:59 2016 +0000

----------------------------------------------------------------------
 .../core/mgmt/internal/LocalEntityManager.java  |  43 ++++++--
 .../core/entity/proxying/EntityManagerTest.java | 108 +++++++++++++++++++
 2 files changed, 144 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00c8f1c6/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java
index 6edecfc..3a9d756 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalEntityManager.java
@@ -19,6 +19,7 @@
 package org.apache.brooklyn.core.mgmt.internal;
 
 import static com.google.common.base.Preconditions.checkNotNull;
+
 import groovy.util.ObservableList;
 
 import java.lang.reflect.Proxy;
@@ -73,8 +74,8 @@ import com.google.common.base.Function;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Predicate;
 import com.google.common.base.Predicates;
+import com.google.common.collect.FluentIterable;
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
@@ -212,13 +213,41 @@ public class LocalEntityManager implements 
EntityManagerInternal {
 
     @Override
     public Iterable<Entity> getAllEntitiesInApplication(Application 
application) {
+        // To fix https://issues.apache.org/jira/browse/BROOKLYN-352, we need 
to synchronize on
+        // preRegisteredEntitiesById and preManagedEntitiesById while 
iterating over them (because
+        // they are synchronizedMaps). entityProxiesById is a ConcurrentMap, 
so no need to 
+        // synchronize on that.
+        // Only synchronize on one at a time, to avoid the risk of deadlock.
+        
         Predicate<Entity> predicate = 
EntityPredicates.applicationIdEqualTo(application.getId());
-        Iterable<Entity> allentities = 
Iterables.concat(preRegisteredEntitiesById.values(), 
preManagedEntitiesById.values(), entityProxiesById.values());
-        Iterable<Entity> result = Iterables.filter(allentities, predicate);
-        return ImmutableSet.copyOf(Iterables.transform(result, new 
Function<Entity, Entity>() {
-            @Override public Entity apply(Entity input) {
-                return Entities.proxy(input);
-            }}));
+        Set<Entity> result = Sets.newLinkedHashSet();
+        
+        synchronized (preRegisteredEntitiesById) {
+            for (Entity entity : preRegisteredEntitiesById.values()) {
+                if (predicate.apply(entity)) {
+                    result.add(entity);
+                }
+            }
+        }
+        synchronized (preManagedEntitiesById) {
+            for (Entity entity : preManagedEntitiesById.values()) {
+                if (predicate.apply(entity)) {
+                    result.add(entity);
+                }
+            }
+        }
+        for (Entity entity : entityProxiesById.values()) {
+            if (predicate.apply(entity)) {
+                result.add(entity);
+            }
+        }
+        
+        return FluentIterable.from(result)
+                .transform(new Function<Entity, Entity>() {
+                    @Override public Entity apply(Entity input) {
+                        return Entities.proxy(input);
+                    }})
+                .toSet();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/brooklyn-server/blob/00c8f1c6/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java
 
b/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java
index 315bec4..7a667c3 100644
--- 
a/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/core/entity/proxying/EntityManagerTest.java
@@ -22,9 +22,19 @@ import static org.testng.Assert.assertEquals;
 import static org.testng.Assert.assertFalse;
 import static org.testng.Assert.assertTrue;
 
+import java.util.ConcurrentModificationException;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.brooklyn.api.entity.EntitySpec;
 import org.apache.brooklyn.api.mgmt.EntityManager;
+import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.factory.ApplicationBuilder;
+import org.apache.brooklyn.core.mgmt.internal.LocalEntityManager;
 import org.apache.brooklyn.core.objs.proxy.EntityProxy;
 import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
 import org.apache.brooklyn.core.test.entity.TestApplication;
@@ -32,14 +42,24 @@ import org.apache.brooklyn.core.test.entity.TestEntity;
 import org.apache.brooklyn.core.test.entity.TestEntityImpl;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Predicates;
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.Lists;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
 
 public class EntityManagerTest extends BrooklynAppUnitTestSupport {
 
+    private static final Logger LOG = 
LoggerFactory.getLogger(EntityManagerTest.class);
+
     private EntityManager entityManager;
 
     @BeforeMethod(alwaysRun=true)
@@ -80,4 +100,92 @@ public class EntityManagerTest extends 
BrooklynAppUnitTestSupport {
         
Asserts.assertEqualsIgnoringOrder(entityManager.findEntities(Predicates.instanceOf(TestApplication.class)),
 ImmutableList.of(app, app2));
         
Asserts.assertEqualsIgnoringOrder(entityManager.findEntitiesInApplication(app, 
Predicates.instanceOf(TestApplication.class)), ImmutableList.of(app));
     }
+    
+    // See https://issues.apache.org/jira/browse/BROOKLYN-352
+    // Before the fix, 250ms was sufficient to cause the 
ConcurrentModificationException
+    @Test
+    public void testGetAllEntitiesWhileEntitiesAddedAndRemoved() throws 
Exception {
+        runGetAllEntitiesWhileEntitiesAddedAndRemoved(Duration.millis(250));
+    }
+    
+    @Test(groups="Integration")
+    public void testGetAllEntitiesWhileEntitiesAddedAndRemovedManyTimes() 
throws Exception {
+        runGetAllEntitiesWhileEntitiesAddedAndRemoved(Duration.seconds(10));
+    }
+    
+    /**
+     * See https://issues.apache.org/jira/browse/BROOKLYN-352.
+     * 
+     * Tests for a {@link ConcurrentModificationException} in 
+     * {@link 
LocalEntityManager#getAllEntitiesInApplication(org.apache.brooklyn.api.entity.Application)}
+     * by running multiple threads which continually call that method, while 
also running multiple
+     * threads that add/remove entities (thus modifying the collections being 
inspected by
+     * {@code getAllEntitiesInApplication}.
+     */
+    protected void runGetAllEntitiesWhileEntitiesAddedAndRemoved(Duration 
duration) throws Exception {
+        final int NUM_GETTER_THREADS = 10;
+        final int NUM_ENTITY_LIFECYCLE_THREADS = 10;
+        
+        final AtomicBoolean running = new AtomicBoolean(true);
+        List<ListenableFuture<?>> futures = Lists.newArrayList();
+        ListeningExecutorService executor = 
MoreExecutors.listeningDecorator(Executors.newCachedThreadPool());
+        try {
+            for (int i = 0; i < NUM_GETTER_THREADS; i++) {
+                ListenableFuture<?> future = executor.submit(new 
Callable<Void>() {
+                    public Void call() throws Exception {
+                        int numCycles = 0;
+                        try {
+                            while (running.get()) {
+                                
((LocalEntityManager)entityManager).getAllEntitiesInApplication(app);
+                                numCycles++;
+                            }
+                            LOG.info("Executed getAllEntitiesInApplication " + 
numCycles + " times");
+                            return null;
+                        } catch (Exception e) {
+                            LOG.error("Error in task for 
getAllEntitiesInApplication, cycle " + numCycles, e);
+                            throw e;
+                        }
+                    }});
+                futures.add(future);
+            }
+
+            for (int i = 0; i < NUM_ENTITY_LIFECYCLE_THREADS; i++) {
+                ListenableFuture<?> future = executor.submit(new 
Callable<Void>() {
+                    public Void call() {
+                        List<TestEntity> entities = Lists.newLinkedList();
+                        int numCycles = 0;
+                        try {
+                            while (running.get()) {
+                                for (int i = 0; i < 10; i++) {
+                                    TestEntity entity = 
app.addChild(EntitySpec.create(TestEntity.class));
+                                    entities.add(entity);
+                                    if (!running.get()) break;
+                                }
+                                for (int i = 0; i < 10; i++) {
+                                    Entities.unmanage(entities.remove(0));
+                                    if (!running.get()) break;
+                                }
+                                numCycles++;
+                            }
+                            LOG.info("Executed add/remove children " + 
numCycles + " cycles (" + (numCycles*10) + " entities)");
+                            return null;
+                        } catch (Exception e) {
+                            LOG.error("Error in task for add/remove children, 
cycle " + numCycles, e);
+                            throw e;
+                        }
+                    }});
+                futures.add(future);
+            }
+        
+            try {
+                Futures.allAsList(futures).get(duration.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+            } catch (TimeoutException e) {
+                // This is good; it means we're still running after 10 seconds 
with no exceptions
+            }
+            running.set(false);
+            
Futures.allAsList(futures).get(Asserts.DEFAULT_LONG_TIMEOUT.toMilliseconds(), 
TimeUnit.MILLISECONDS);
+        } finally {
+            executor.shutdownNow();
+        }
+    }
 }

Reply via email to