This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit f887c9afd8b10642e53b8a2ada7a418697c4c6db Author: Alex Heneveld <[email protected]> AuthorDate: Fri Jul 19 15:28:19 2024 +0100 don't allow blocking when entities are creating --- .../spi/dsl/BrooklynDslDeferredSupplier.java | 4 +-- .../mgmt/internal/LocalSubscriptionManager.java | 18 ++++++---- .../core/objs/proxy/InternalEntityFactory.java | 39 +++++++++++++++++++--- .../core/workflow/WorkflowInitializer.java | 17 ++++++++-- .../core/workflow/WorkflowStepResolution.java | 7 ++-- .../org/apache/brooklyn/util/concurrent/Locks.java | 15 +++++++-- 6 files changed, 79 insertions(+), 21 deletions(-) diff --git a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java index 971e571c09..ea4b3ce02f 100644 --- a/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java +++ b/camp/camp-brooklyn/src/main/java/org/apache/brooklyn/camp/brooklyn/spi/dsl/BrooklynDslDeferredSupplier.java @@ -125,8 +125,8 @@ public abstract class BrooklynDslDeferredSupplier<T> implements DeferredSupplier Task<T> task = newTask(); T result; try { - result = exec.submit(task).get(); - } catch (InterruptedException | ExecutionException e) { + result = exec.get(task); + } catch (Exception e) { Task<?> currentTask = Tasks.current(); if (currentTask != null && currentTask.isCancelled()) { task.cancel(true); diff --git a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java index 6a6b1f21fb..6d17612616 100644 --- a/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java +++ b/core/src/main/java/org/apache/brooklyn/core/mgmt/internal/LocalSubscriptionManager.java @@ -25,10 +25,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.ImmutableList; import com.google.common.collect.Multimaps; import org.apache.brooklyn.api.entity.Entity; -import org.apache.brooklyn.api.mgmt.ExecutionContext; -import org.apache.brooklyn.api.mgmt.ExecutionManager; -import org.apache.brooklyn.api.mgmt.SubscriptionHandle; -import org.apache.brooklyn.api.mgmt.SubscriptionManager; +import org.apache.brooklyn.api.mgmt.*; import org.apache.brooklyn.api.sensor.AttributeSensor; import org.apache.brooklyn.api.sensor.Sensor; import org.apache.brooklyn.api.sensor.SensorEvent; @@ -42,6 +39,7 @@ import org.apache.brooklyn.util.collections.MutableList; import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.core.task.BasicExecutionContext; import org.apache.brooklyn.util.core.task.BasicExecutionManager; +import org.apache.brooklyn.util.core.task.BasicTask; import org.apache.brooklyn.util.core.task.SingleThreadedScheduler; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.text.Identifiers; @@ -166,7 +164,7 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager { if (notifyOfInitialValue) { if (LOG.isTraceEnabled()) LOG.trace("sending initial value of {} -> {} to {}", new Object[] {s.producer, s.sensor, s}); - // this is run asynchronously to prevent deadlock when trying to get attribute and publish; + // after entity creation, this is run asynchronously to prevent deadlock when trying to get attribute and publish; // however we want it: // (a) to run in the same order as subscriptions are made, so use the manager tag scheduler // (b) ideally to use the last value that was not published to this target, and @@ -179,8 +177,10 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager { // window between adding the subscription and taking the last value, // we will think the last value hasn't changed. but we will never send a // wrong value as this backs out if there is any confusion over the last value. - em.submit( - MutableMap.of("tags", getPublishTags(s, s.producer), + // while the entity is being created, it should run in the same thread. + boolean isPreManagement = s.producer!=null && !((EntityInternal)s.producer).getManagementSupport().wasDeployed(); + + Task<Void> t = new BasicTask(MutableMap.of("tags", getPublishTags(s, s.producer), "displayName", "Initial value publication on subscription to "+s.sensor.getName()), () -> { T val = (T) s.producer.sensors().get((AttributeSensor<?>) s.sensor); @@ -199,6 +199,10 @@ public class LocalSubscriptionManager extends AbstractSubscriptionManager { // by ourselves, as we are already in the right thread now and can prevent interleaving this way submitPublishEvent(s, new BasicSensorEvent<T>(s.sensor, s.producer, val), true); }); + + if (isPreManagement) ((EntityInternal)s.producer).getExecutionContext().get(t); + else em.submit(t); + } return s; diff --git a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java index 32915efcbe..b0f7454ed7 100644 --- a/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java +++ b/core/src/main/java/org/apache/brooklyn/core/objs/proxy/InternalEntityFactory.java @@ -51,10 +51,13 @@ import org.apache.brooklyn.util.collections.MutableMap; import org.apache.brooklyn.util.collections.MutableSet; import org.apache.brooklyn.util.core.flags.FlagUtils; import org.apache.brooklyn.util.core.flags.TypeCoercions; +import org.apache.brooklyn.util.core.task.ImmediateSupplier; import org.apache.brooklyn.util.core.task.Tasks; import org.apache.brooklyn.util.exceptions.Exceptions; +import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.javalang.AggregateClassLoader; import org.apache.brooklyn.util.javalang.Reflections; +import org.apache.brooklyn.util.javalang.Threads; import org.apache.brooklyn.util.text.Strings; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -67,6 +70,7 @@ import java.util.Map.Entry; import java.util.Objects; import java.util.Queue; import java.util.Set; +import java.util.function.BiConsumer; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -193,10 +197,21 @@ public class InternalEntityFactory extends InternalFactory { Map<String,Entity> entitiesByEntityId = MutableMap.of(); Map<String,EntitySpec<?>> specsByEntityId = MutableMap.of(); + T entity = createEntityAndDescendantsUninitialized(0, spec, options, entitiesByEntityId, specsByEntityId); try { - initEntityAndDescendants(entity.getId(), entitiesByEntityId, specsByEntityId, options); + // prevent this from blocking + Thread.currentThread().interrupt(); + try { + initEntityAndDescendants(entity.getId(), entitiesByEntityId, specsByEntityId, options); + } finally { + // end of non-blocking portion + Thread.interrupted(); + } } catch (RuntimeException ex) { + // end of non-blocking portion + Thread.interrupted(); + options.onException(ex, (e) -> { Exceptions.propagateIfFatal(e); log.info("Failed to initialise entity " + entity + " and its descendants - unmanaging and propagating original exception: " + Exceptions.collapseText(e)); @@ -210,6 +225,7 @@ public class InternalEntityFactory extends InternalFactory { throw e; }); } + return entity; } @@ -407,18 +423,32 @@ public class InternalEntityFactory extends InternalFactory { } ((AbstractEntity)entity).addLocations(spec.getLocations()); + BiConsumer<String,Runnable> runNowOrLater = (name, runnable) -> { + try { + runnable.run(); + } catch (Exception e) { + Throwable interrupt = Exceptions.getFirstThrowableMatching(e, t -> t instanceof InterruptedException || t instanceof RuntimeInterruptedException || t instanceof ImmediateSupplier.ImmediateValueNotAvailableException); + if (interrupt != null) { + log.debug("Unable to create " + name + " as part of initializing " + entity + " (will submit deferred): " + e); + Entities.submit(entity, Tasks.create(name, runnable)); + } else { + throw Exceptions.propagate(e); + } + } + }; + List<EntityInitializer> initializers = Stream.concat(getGlobalDeploymentInitializers().stream(), spec.getInitializers().stream()) .collect(Collectors.toList()); for (EntityInitializer initializer: initializers) { - initializer.apply((EntityInternal)entity); + runNowOrLater.accept(""+initializer, () -> initializer.apply((EntityInternal)entity)); } for (EnricherSpec<?> enricherSpec : spec.getEnricherSpecs()) { - entity.enrichers().add(policyFactory.createEnricher(enricherSpec)); + runNowOrLater.accept(""+enricherSpec, () -> entity.enrichers().add(policyFactory.createEnricher(enricherSpec))); } for (PolicySpec<?> policySpec : spec.getPolicySpecs()) { - entity.policies().add(policyFactory.createPolicy(policySpec)); + runNowOrLater.accept(""+policySpec, () -> entity.policies().add(policyFactory.createPolicy(policySpec))); } for (Entity child: entity.getChildren()) { @@ -430,6 +460,7 @@ public class InternalEntityFactory extends InternalFactory { if (entity instanceof EntityPostInitializable) { ((EntityPostInitializable)entity).postInit(); } + } }).build()); } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java index 0e80227a9b..d4411c407a 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowInitializer.java @@ -111,9 +111,20 @@ public class WorkflowInitializer extends EntityInitializers.InitializerPatternWi if (delayDuration.isPositive()) Time.sleep(delayDuration); })); } - Task<Object> submitted = delayed ? DynamicTasks.queue(task.get()) : Entities.submit(entity, task.get()); - if (delayed) DynamicTasks.waitForLast(); - Object result = submitted.getUnchecked(); + Object result; + if (delayed) { + result = DynamicTasks.queue(task.get()); + DynamicTasks.waitForLast(); + } else { + //result = ((EntityInternal)entity).getExecutionContext().get(task.get()); + if (((EntityInternal) entity).getManagementSupport().wasDeployed()) { + result = Entities.submit(entity, task.get()).getUnchecked(); + } else { + // if initializer is pre-deployment, it should run in background, deferred until after management started + Entities.submit(entity, task.get()); + result = "<in progress>"; + } + } log.debug("Applied workflow initializer on " + entity + ", result: " + result); return result; } diff --git a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java index e356ee1a56..f2d9555842 100644 --- a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java +++ b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowStepResolution.java @@ -30,6 +30,7 @@ import org.apache.brooklyn.api.mgmt.classloading.BrooklynClassLoadingContext; import org.apache.brooklyn.api.objs.BrooklynObject; import org.apache.brooklyn.core.entity.Entities; import org.apache.brooklyn.core.entity.EntityAdjuncts.EntityAdjunctProxyable; +import org.apache.brooklyn.core.entity.EntityInternal; import org.apache.brooklyn.core.mgmt.BrooklynTaskTags; import org.apache.brooklyn.core.mgmt.internal.AbstractManagementContext; import org.apache.brooklyn.core.objs.BrooklynObjectInternal; @@ -228,9 +229,9 @@ public class WorkflowStepResolution { if (entity==null) { def = converter.call(); } else { - // run in a task context if we can, to facilitate conversion and type lookup - def = Entities.submit(entity, - Tasks.builder().displayName("convert steps").body(converter).tag(BrooklynTaskTags.TRANSIENT_TASK_TAG).build()).getUnchecked(); + // run in a task context if we can, to facilitate conversion and type lookup; run in same thread, so we can do this at deploy time + def = ((EntityInternal)entity).getExecutionContext().get( + Tasks.builder().displayName("convert steps").body(converter).tag(BrooklynTaskTags.TRANSIENT_TASK_TAG).build()); } if (def instanceof WorkflowStepDefinition.WorkflowStepDefinitionWithSpecialDeserialization) { diff --git a/utils/common/src/main/java/org/apache/brooklyn/util/concurrent/Locks.java b/utils/common/src/main/java/org/apache/brooklyn/util/concurrent/Locks.java index 6efef088c6..0cea384c17 100644 --- a/utils/common/src/main/java/org/apache/brooklyn/util/concurrent/Locks.java +++ b/utils/common/src/main/java/org/apache/brooklyn/util/concurrent/Locks.java @@ -26,8 +26,19 @@ import org.apache.brooklyn.util.exceptions.Exceptions; public class Locks { public static <T> T withLock(Lock lock, Callable<T> body) { + boolean locked = false; try { - lock.lockInterruptibly(); + if (Thread.currentThread().isInterrupted()) { + if (!lock.tryLock()) { + String msg = "Lock is held by another thread, and waiting not permitted: " + lock; + if (!lock.tryLock()) { + throw new InterruptedException(msg); + } + } + } else { + lock.lockInterruptibly(); + } + locked = true; } catch (InterruptedException e) { throw Exceptions.propagate(e); } @@ -36,7 +47,7 @@ public class Locks { } catch (Exception e) { throw Exceptions.propagate(e); } finally { - lock.unlock(); + if (locked) lock.unlock(); } }
