This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit f887c9afd8b10642e53b8a2ada7a418697c4c6db
Author: Alex Heneveld <[email protected]>
AuthorDate: Fri Jul 19 15:28:19 2024 +0100

    don't allow blocking when entities are creating
---
 .../spi/dsl/BrooklynDslDeferredSupplier.java       |  4 +--
 .../mgmt/internal/LocalSubscriptionManager.java    | 18 ++++++----
 .../core/objs/proxy/InternalEntityFactory.java     | 39 +++++++++++++++++++---
 .../core/workflow/WorkflowInitializer.java         | 17 ++++++++--
 .../core/workflow/WorkflowStepResolution.java      |  7 ++--
 .../org/apache/brooklyn/util/concurrent/Locks.java | 15 +++++++--
 6 files changed, 79 insertions(+), 21 deletions(-)

diff --git 
a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
 
b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
index 971e571c09..ea4b3ce02f 100644
--- 
a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
+++ 
b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java
@@ -125,8 +125,8 @@ public abstract class BrooklynDslDeferredSupplier<T> 
implements DeferredSupplier
         Task<T> task = newTask();
         T result;
         try {
-            result = exec.submit(task).get();
-        } catch (InterruptedException | ExecutionException e) {
+            result = exec.get(task);
+        } catch (Exception e) {
             Task<?> currentTask = Tasks.current();
             if (currentTask != null && currentTask.isCancelled()) {
                 task.cancel(true);
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
index 6a6b1f21fb..6d17612616 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java
@@ -25,10 +25,7 @@ import com.google.common.collect.HashMultimap;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.Multimaps;
 import org.apache.brooklyn.api.entity.Entity;
-import org.apache.brooklyn.api.mgmt.ExecutionContext;
-import org.apache.brooklyn.api.mgmt.ExecutionManager;
-import org.apache.brooklyn.api.mgmt.SubscriptionHandle;
-import org.apache.brooklyn.api.mgmt.SubscriptionManager;
+import org.apache.brooklyn.api.mgmt.*;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.Sensor;
 import org.apache.brooklyn.api.sensor.SensorEvent;
@@ -42,6 +39,7 @@ import org.apache.brooklyn.util.collections.MutableList;
 import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.core.task.BasicExecutionContext;
 import org.apache.brooklyn.util.core.task.BasicExecutionManager;
+import org.apache.brooklyn.util.core.task.BasicTask;
 import org.apache.brooklyn.util.core.task.SingleThreadedScheduler;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.text.Identifiers;
@@ -166,7 +164,7 @@ public class LocalSubscriptionManager extends 
AbstractSubscriptionManager {
 
         if (notifyOfInitialValue) {
             if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} 
-> {} to {}", new Object[] {s.producer, s.sensor, s});
-            // this is run asynchronously to prevent deadlock when trying to 
get attribute and publish;
+            // after entity creation, this is run asynchronously to prevent 
deadlock when trying to get attribute and publish;
             // however we want it:
             // (a) to run in the same order as subscriptions are made, so use 
the manager tag scheduler
             // (b) ideally to use the last value that was not published to 
this target, and
@@ -179,8 +177,10 @@ public class LocalSubscriptionManager extends 
AbstractSubscriptionManager {
             // window between adding the subscription and taking the last 
value,
             // we will think the last value hasn't changed.  but we will never 
send a
             // wrong value as this backs out if there is any confusion over 
the last value.
-            em.submit(
-                MutableMap.of("tags", getPublishTags(s, s.producer),
+            // while the entity is being created, it should run in the same 
thread.
+            boolean isPreManagement = s.producer!=null && 
!((EntityInternal)s.producer).getManagementSupport().wasDeployed();
+
+            Task<Void> t = new BasicTask(MutableMap.of("tags", 
getPublishTags(s, s.producer),
                     "displayName", "Initial value publication on subscription 
to "+s.sensor.getName()),
                 () -> {
                     T val = (T) s.producer.sensors().get((AttributeSensor<?>) 
s.sensor);
@@ -199,6 +199,10 @@ public class LocalSubscriptionManager extends 
AbstractSubscriptionManager {
                     // by ourselves, as we are already in the right thread now 
and can prevent interleaving this way
                     submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, 
s.producer, val), true);
                 });
+
+            if (isPreManagement) 
((EntityInternal)s.producer).getExecutionContext().get(t);
+            else em.submit(t);
+
         }
         
         return s;
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
 
b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
index 32915efcbe..b0f7454ed7 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java
@@ -51,10 +51,13 @@ import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.core.flags.FlagUtils;
 import org.apache.brooklyn.util.core.flags.TypeCoercions;
+import org.apache.brooklyn.util.core.task.ImmediateSupplier;
 import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.exceptions.Exceptions;
+import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.javalang.AggregateClassLoader;
 import org.apache.brooklyn.util.javalang.Reflections;
+import org.apache.brooklyn.util.javalang.Threads;
 import org.apache.brooklyn.util.text.Strings;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -67,6 +70,7 @@ import java.util.Map.Entry;
 import java.util.Objects;
 import java.util.Queue;
 import java.util.Set;
+import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.Stream;
 
@@ -193,10 +197,21 @@ public class InternalEntityFactory extends 
InternalFactory {
         Map<String,Entity> entitiesByEntityId = MutableMap.of();
         Map<String,EntitySpec<?>> specsByEntityId = MutableMap.of();
 
+
         T entity = createEntityAndDescendantsUninitialized(0, spec, options, 
entitiesByEntityId, specsByEntityId);
         try {
-            initEntityAndDescendants(entity.getId(), entitiesByEntityId, 
specsByEntityId, options);
+            // prevent this from blocking
+            Thread.currentThread().interrupt();
+            try {
+                initEntityAndDescendants(entity.getId(), entitiesByEntityId, 
specsByEntityId, options);
+            } finally {
+                // end of non-blocking portion
+                Thread.interrupted();
+            }
         } catch (RuntimeException ex) {
+            // end of non-blocking portion
+            Thread.interrupted();
+
             options.onException(ex, (e) -> {
                 Exceptions.propagateIfFatal(e);
                 log.info("Failed to initialise entity " + entity + " and its 
descendants - unmanaging and propagating original exception: " + 
Exceptions.collapseText(e));
@@ -210,6 +225,7 @@ public class InternalEntityFactory extends InternalFactory {
                 throw e;
             });
         }
+
         return entity;
     }
 
@@ -407,18 +423,32 @@ public class InternalEntityFactory extends 
InternalFactory {
                 }
                 ((AbstractEntity)entity).addLocations(spec.getLocations());
 
+                BiConsumer<String,Runnable> runNowOrLater = (name, runnable) 
-> {
+                    try {
+                        runnable.run();
+                    } catch (Exception e) {
+                        Throwable interrupt = 
Exceptions.getFirstThrowableMatching(e, t -> t instanceof InterruptedException 
|| t instanceof RuntimeInterruptedException || t instanceof 
ImmediateSupplier.ImmediateValueNotAvailableException);
+                        if (interrupt != null) {
+                            log.debug("Unable to create " + name + " as part 
of initializing " + entity + " (will submit deferred): " + e);
+                            Entities.submit(entity, Tasks.create(name, 
runnable));
+                        } else {
+                            throw Exceptions.propagate(e);
+                        }
+                    }
+                };
+
                 List<EntityInitializer> initializers = 
Stream.concat(getGlobalDeploymentInitializers().stream(), 
spec.getInitializers().stream())
                         .collect(Collectors.toList());
                 for (EntityInitializer initializer: initializers) {
-                    initializer.apply((EntityInternal)entity);
+                    runNowOrLater.accept(""+initializer, () -> 
initializer.apply((EntityInternal)entity));
                 }
 
                 for (EnricherSpec<?> enricherSpec : spec.getEnricherSpecs()) {
-                    
entity.enrichers().add(policyFactory.createEnricher(enricherSpec));
+                    runNowOrLater.accept(""+enricherSpec, () -> 
entity.enrichers().add(policyFactory.createEnricher(enricherSpec)));
                 }
 
                 for (PolicySpec<?> policySpec : spec.getPolicySpecs()) {
-                    
entity.policies().add(policyFactory.createPolicy(policySpec));
+                    runNowOrLater.accept(""+policySpec, () -> 
entity.policies().add(policyFactory.createPolicy(policySpec)));
                 }
 
                 for (Entity child: entity.getChildren()) {
@@ -430,6 +460,7 @@ public class InternalEntityFactory extends InternalFactory {
                 if (entity instanceof EntityPostInitializable) {
                     ((EntityPostInitializable)entity).postInit();
                 }
+
             }
         }).build());
     }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java
index 0e80227a9b..d4411c407a 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java
@@ -111,9 +111,20 @@ public class WorkflowInitializer extends 
EntityInitializers.InitializerPatternWi
                             if (delayDuration.isPositive()) 
Time.sleep(delayDuration);
                         }));
                     }
-                    Task<Object> submitted = delayed ? 
DynamicTasks.queue(task.get()) : Entities.submit(entity, task.get());
-                    if (delayed) DynamicTasks.waitForLast();
-                    Object result = submitted.getUnchecked();
+                    Object result;
+                    if (delayed) {
+                        result = DynamicTasks.queue(task.get());
+                        DynamicTasks.waitForLast();
+                    } else {
+                        //result = 
((EntityInternal)entity).getExecutionContext().get(task.get());
+                        if (((EntityInternal) 
entity).getManagementSupport().wasDeployed()) {
+                            result = Entities.submit(entity, 
task.get()).getUnchecked();
+                        } else {
+                            // if initializer is pre-deployment, it should run 
in background, deferred until after management started
+                            Entities.submit(entity, task.get());
+                            result = "<in progress>";
+                        }
+                    }
                     log.debug("Applied workflow initializer on " + entity + ", 
result: " + result);
                     return result;
                 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java
 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java
index e356ee1a56..f2d9555842 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java
@@ -30,6 +30,7 @@ import 
org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext;
 import org.apache.brooklyn.api.objs.BrooklynObject;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityAdjuncts.EntityAdjunctProxyable;
+import org.apache.brooklyn.core.entity.EntityInternal;
 import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext;
 import org.apache.brooklyn.core.objs.BrooklynObjectInternal;
@@ -228,9 +229,9 @@ public class WorkflowStepResolution {
             if (entity==null) {
                 def = converter.call();
             } else {
-                // run in a task context if we can, to facilitate conversion 
and type lookup
-                def = Entities.submit(entity,
-                        Tasks.builder().displayName("convert 
steps").body(converter).tag(BrooklynTaskTags.TRANSIENT_TASK_TAG).build()).getUnchecked();
+                // run in a task context if we can, to facilitate conversion 
and type lookup; run in same thread, so we can do this at deploy time
+                def = ((EntityInternal)entity).getExecutionContext().get(
+                        Tasks.builder().displayName("convert 
steps").body(converter).tag(BrooklynTaskTags.TRANSIENT_TASK_TAG).build());
             }
 
             if (def instanceof 
WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization) {
diff --git 
a/utils/common/src/main/java/org/apache/brooklyn/util/concurrent/Locks.java 
b/utils/common/src/main/java/org/apache/brooklyn/util/concurrent/Locks.java
index 6efef088c6..0cea384c17 100644
--- a/utils/common/src/main/java/org/apache/brooklyn/util/concurrent/Locks.java
+++ b/utils/common/src/main/java/org/apache/brooklyn/util/concurrent/Locks.java
@@ -26,8 +26,19 @@ import org.apache.brooklyn.util.exceptions.Exceptions;
 public class Locks {
 
     public static <T> T withLock(Lock lock, Callable<T> body) {
+        boolean locked = false;
         try {
-            lock.lockInterruptibly();
+            if (Thread.currentThread().isInterrupted()) {
+                if (!lock.tryLock()) {
+                    String msg = "Lock is held by another thread, and waiting 
not permitted: " + lock;
+                    if (!lock.tryLock()) {
+                        throw new InterruptedException(msg);
+                    }
+                }
+            } else {
+                lock.lockInterruptibly();
+            }
+            locked = true;
         } catch (InterruptedException e) {
             throw Exceptions.propagate(e);
         }
@@ -36,7 +47,7 @@ public class Locks {
         } catch (Exception e) {
             throw Exceptions.propagate(e);
         } finally {
-            lock.unlock();
+            if (locked) lock.unlock();
         }       
     }
     

Reply via email to