This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 665dc5331a7ce9b9e92087dd3ee2b9e75e5acddc
Author: Alex Heneveld <[email protected]>
AuthorDate: Wed Jan 22 21:14:32 2025 +0000

    improve wait and recalculation for in-flight racing updates to service up, 
problems, and expected
    
    setExpected will wait a while, because that is called explicitly when 
children tasks have finished
    (and should have concluded their expected) but might not have delivered all 
their sensors up to the parent;
    it stops waiting if there are no unrelated subscription deliveries in flight
---
 .../core/entity/lifecycle/ServiceStateLogic.java   | 225 ++++++++++++---------
 .../blueprints/ChildrenQuorateRaceTest.java        | 205 +++++++++++++++++++
 2 files changed, 340 insertions(+), 90 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
 
b/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
index 9310a44b99..baf7c644bc 100644
--- 
a/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
+++ 
b/core/src/main/java/org/apache/brooklyn/core/entity/lifecycle/ServiceStateLogic.java
@@ -34,6 +34,7 @@ import org.apache.brooklyn.api.effector.Effector;
 import org.apache.brooklyn.api.entity.Entity;
 import org.apache.brooklyn.api.entity.EntityLocal;
 import org.apache.brooklyn.api.entity.Group;
+import org.apache.brooklyn.api.mgmt.Task;
 import org.apache.brooklyn.api.sensor.AttributeSensor;
 import org.apache.brooklyn.api.sensor.Enricher;
 import org.apache.brooklyn.api.sensor.EnricherSpec;
@@ -51,9 +52,9 @@ import org.apache.brooklyn.core.entity.Attributes;
 import org.apache.brooklyn.core.entity.Entities;
 import org.apache.brooklyn.core.entity.EntityAdjuncts;
 import org.apache.brooklyn.core.entity.EntityInternal;
-import org.apache.brooklyn.core.entity.EntityPredicates;
 import org.apache.brooklyn.core.entity.lifecycle.Lifecycle.Transition;
 import org.apache.brooklyn.core.entity.trait.Startable;
+import org.apache.brooklyn.core.mgmt.BrooklynTaskTags;
 import org.apache.brooklyn.core.sensor.BasicSensorEvent;
 import org.apache.brooklyn.enricher.stock.AbstractMultipleSensorAggregator;
 import org.apache.brooklyn.enricher.stock.Enrichers;
@@ -64,6 +65,7 @@ import org.apache.brooklyn.util.collections.MutableMap;
 import org.apache.brooklyn.util.collections.MutableSet;
 import org.apache.brooklyn.util.collections.QuorumCheck;
 import org.apache.brooklyn.util.collections.QuorumCheck.QuorumChecks;
+import org.apache.brooklyn.util.core.task.Tasks;
 import org.apache.brooklyn.util.core.task.ValueResolver;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.guava.Functionals;
@@ -164,8 +166,8 @@ public class ServiceStateLogic {
     }
     
     private static void setExpectedState(Entity entity, Lifecycle state, 
boolean waitBrieflyForServiceUpIfRunning) {
-        if (waitBrieflyForServiceUpIfRunning) {
-            recomputeIfIssueWhenBecomingExpectedRunning("setting expected 
state", entity, state);
+        if (waitBrieflyForServiceUpIfRunning && state == Lifecycle.RUNNING) {
+            recomputeWantingNoIssuesWhenBecomingExpectedRunning("setting 
expected state", entity, RecomputeWaitMode.LONG);
         }
         
((EntityInternal)entity).sensors().set(Attributes.SERVICE_STATE_EXPECTED, new 
Lifecycle.Transition(state, new Date()));
 
@@ -181,67 +183,97 @@ public class ServiceStateLogic {
         return expected.getState();
     }
 
-    public static void recomputeIfIssueWhenBecomingExpectedRunning(String 
when, Entity entity, Lifecycle state) {
-        if (!Entities.isManagedActive(entity) || state!=Lifecycle.RUNNING) {
-            return;
-        } else {
-            Map<String, Object> problems = 
entity.getAttribute(SERVICE_PROBLEMS);
-            boolean noProblems = problems == null || problems.isEmpty();
-            Boolean up = entity.getAttribute(Attributes.SERVICE_UP);
-            if (Boolean.TRUE.equals(up) && noProblems) {
-                return;
-            } else {
-                log.debug("Service not up pre-check, up="+up+" and 
problems="+problems+" when setting "+ state +" (when "+when+") on " + entity+"; 
possibly just needs a recompute; doing recompute now");
-
-                try {
-                    Iterables.filter(entity.enrichers(), x -> x instanceof 
ComputeServiceIndicatorsFromChildrenAndMembers).forEach(
-                            x -> {
-                                ComputeServiceIndicatorsFromChildrenAndMembers 
mx = (ComputeServiceIndicatorsFromChildrenAndMembers) x;
-                                if (mx.isRunning()) {
-                                    log.debug("Service not up pre-check 
recompute rerunning "+mx);
-                                    mx.onUpdated();
-                                }
-                            }
-                    );
-
-                    Map<String, Object> notUpIndicators = 
entity.sensors().get(Attributes.SERVICE_NOT_UP_INDICATORS);
-                    if (notUpIndicators == null || notUpIndicators.isEmpty()) {
-                        Maybe<Enricher> css = 
EntityAdjuncts.tryFindWithUniqueTag(entity.enrichers(), 
ServiceNotUpLogic.DEFAULT_ENRICHER_UNIQUE_TAG);
-                        if (css.isPresent()) {
-                            SensorEvent<Map<String, Object>> pseudoEvent = new 
BasicSensorEvent<>(Attributes.SERVICE_NOT_UP_INDICATORS, entity, 
notUpIndicators);
-                            ((SensorEventListener) 
css.get()).onEvent(pseudoEvent);
-                            up = entity.getAttribute(Attributes.SERVICE_UP);
-                            log.debug("Service not up pre-check recompute ran, 
service.isUp="+up+" after: "+css);
-                        }
-                    } else {
-                        log.debug("Service not up pre-check recompute not 
running because not up indicators are now: " + notUpIndicators);
-                    }
-                } catch (Exception e) {
-                    Exceptions.propagateIfFatal(e);
-                    log.debug("Service is not up when setting "+ state +" on " 
+ entity+", and attempt to run standard prep workflows failed with exception; 
will wait and see if service up clears, but for reference the error is: "+e);
-                    if (log.isTraceEnabled()) log.trace("Exception trace", e);
-                }
-
-                if (!Boolean.TRUE.equals(up) && 
Entities.isManagedActive(entity)) {
-                    // pause briefly to allow any recent problem-clearing 
processing to complete;
-                    // should be less necessary now that the code above 
explicitly triggers any not-up indicators
-                    Stopwatch timer = Stopwatch.createStarted();
-                    boolean nowUp = Repeater.create()
-                            .every(ValueResolver.REAL_QUICK_PERIOD)
-                            .limitTimeTo(ValueResolver.PRETTY_QUICK_WAIT)
-                            .until(entity, 
EntityPredicates.attributeEqualTo(Attributes.SERVICE_UP, true))
-                            .run();
-                    if (nowUp) {
-                        log.debug("Had to wait " + Duration.of(timer) + " for 
" + entity + " " + Attributes.SERVICE_UP + " to be true before setting " + 
state);
-                    } else {
-                        if (Entities.isManagedActive(entity)) {
-                            log.warn("Service is not up when "+when+" on " + 
entity + "; delayed " + Duration.of(timer) + " "
-                                    + "but " + Attributes.SERVICE_UP + " did 
not recover from " + up + "; not-up-indicators=" + 
entity.getAttribute(Attributes.SERVICE_NOT_UP_INDICATORS));
+    private static boolean isEmptyOrNull(Map x) {
+        return x==null || x.isEmpty();
+    }
+    private enum RecomputeWaitMode { LONG, SHORT, RECOMPUTE_ONLY, NONE }
+    private static boolean 
recomputeWantingNoIssuesWhenBecomingExpectedRunning(String when, Entity entity, 
RecomputeWaitMode canWait) {
+        if (!Entities.isManagedActive(entity)) return true;
+
+        Map<String, Object> problems = entity.getAttribute(SERVICE_PROBLEMS);
+        Boolean up = entity.getAttribute(Attributes.SERVICE_UP);
+        if (Boolean.TRUE.equals(up) && isEmptyOrNull(problems)) return true;
+        if (canWait==RecomputeWaitMode.NONE) return false;
+
+        log.debug("Recompute indicated setting RUNNING ("+when+") on service 
issue; up="+up+", problems="+problems+",on " + entity + " (mode "+canWait+")");
+        try {
+            Iterables.filter(entity.enrichers(), x -> x instanceof 
ComputeServiceIndicatorsFromChildrenAndMembers).forEach(
+                    x -> {
+                        ComputeServiceIndicatorsFromChildrenAndMembers mx = 
(ComputeServiceIndicatorsFromChildrenAndMembers) x;
+                        if (mx.isRunning()) {
+                            log.debug("Recompute rerunning "+mx);
+                            mx.onUpdated();
+                            log.debug("Recomputed values now: problems="+
+                                    entity.sensors().get(SERVICE_PROBLEMS) + 
", not_up_indicators=" + entity.sensors().get(SERVICE_NOT_UP_INDICATORS) );
                         }
                     }
+            );
+
+            Map<String, Object> notUpIndicators = 
entity.sensors().get(Attributes.SERVICE_NOT_UP_INDICATORS);
+            if (notUpIndicators == null || notUpIndicators.isEmpty()) {
+                Maybe<Enricher> css = 
EntityAdjuncts.tryFindWithUniqueTag(entity.enrichers(), 
ServiceNotUpLogic.DEFAULT_ENRICHER_UNIQUE_TAG);
+                if (css.isPresent()) {
+                    SensorEvent<Map<String, Object>> pseudoEvent = new 
BasicSensorEvent<>(Attributes.SERVICE_NOT_UP_INDICATORS, entity, 
notUpIndicators);
+                    ((SensorEventListener) css.get()).onEvent(pseudoEvent);
+                    up = entity.getAttribute(Attributes.SERVICE_UP);
+                    log.debug("Recompute for service indicators now gives: 
service.isUp="+up+" after: "+css);
                 }
+            } else {
+                log.debug("Recomputed not_up_indicators now: " + 
notUpIndicators);
             }
-        }
+        } catch (Exception e) {
+            Exceptions.propagateIfFatal(e);
+            if (!Entities.isManagedActive(entity)) return true;
+            log.debug("Service is not up when setting RUNNING on " + entity+", 
and attempt to run standard prep workflows failed with exception: "+e);
+            if (log.isTraceEnabled()) log.trace("Exception trace", e);
+        }
+
+        if (recomputeWantingNoIssuesWhenBecomingExpectedRunning(when+" (after 
recompute)", entity, RecomputeWaitMode.NONE)) return true;
+        if (canWait==RecomputeWaitMode.RECOMPUTE_ONLY) return false;
+
+        assert canWait==RecomputeWaitMode.LONG || 
canWait==RecomputeWaitMode.SHORT;
+        // repeat with pauses briefly to allow any recent in-flight 
service-ups or state changes or problem-clearing processing to complete;
+        // should only be necessary when using setExpectedState, where canWait 
is true, unless the actual is used without expected;
+        // but leaving it as it used to be to minimise surprises for now;
+        // probably also no need to recompute above either with this (could be 
tidied up a lot)
+        Stopwatch timer = Stopwatch.createStarted();
+        final Duration LONG_WAIT = Duration.THIRTY_SECONDS;  // this should be 
enough time for in-flight racing activities, even when slow and heavily 
contended
+        Task current = Tasks.current();
+        boolean notTimedout = Repeater.create()
+                .every(ValueResolver.REAL_QUICK_PERIOD)
+                .limitTimeTo(canWait==RecomputeWaitMode.LONG ? LONG_WAIT : 
ValueResolver.PRETTY_QUICK_WAIT)
+                .until(() -> {
+                    Set<Task<?>> tasksHere = 
BrooklynTaskTags.getTasksInEntityContext(
+                            ((EntityInternal) 
entity).getManagementContext().getExecutionManager(), entity);
+                    java.util.Optional<Task<?>> unrelatedSubmission = 
tasksHere.stream()
+                            .filter(t ->
+                                    !t.isDone() &&
+                                    BrooklynTaskTags.hasTag(t, 
BrooklynTaskTags.SENSOR_TAG) &&
+                                    !Tasks.isAncestor(current, anc -> 
Objects.equal(anc, t)))
+                            .findAny();
+                    // abort when self and children have no unrelated 
submission tasks, or if something is known to be on_fire;
+                    // otherwise give it LONG_WAIT (arbitrary but inoffensive) 
to settle in that case (things running very slow)
+                    if (!unrelatedSubmission.isPresent()) return true;
+                    return 
recomputeWantingNoIssuesWhenBecomingExpectedRunning(when+" (recheck after 
"+Duration.of(timer.elapsed())+")", entity, RecomputeWaitMode.NONE);
+                })
+                .run();
+
+        boolean nowUp = 
recomputeWantingNoIssuesWhenBecomingExpectedRunning(when+" (recheck after "+
+                (notTimedout ? "completion" : "timeout")+", after 
"+Duration.of(timer.elapsed())+")", entity, RecomputeWaitMode.RECOMPUTE_ONLY);
+        if (nowUp) {
+            log.debug("Recompute determined "+entity+" is up, after " + 
Duration.of(timer));
+            return true;
+        }
+
+        if (!Entities.isManagedActive(entity)) return true;
+
+        log.warn("Service is not up when "+when+" on " + entity + "; delayed " 
+ Duration.of(timer) + " "
+                + "but: " + Attributes.SERVICE_UP + "=" + 
entity.getAttribute(Attributes.SERVICE_UP) + ", "
+                + "not-up-indicators=" + 
entity.getAttribute(Attributes.SERVICE_NOT_UP_INDICATORS)
+                + ", problems=" + 
entity.getAttribute(Attributes.SERVICE_PROBLEMS)
+        );
+        // slight chance above has updated since the check, but previuos log 
messages should make clear what happened
+        return false;
     }
 
     public static Lifecycle getActualState(Entity entity) {
@@ -309,7 +341,7 @@ public class ServiceStateLogic {
                     .defaultValue(Entities.UNCHANGED).apply(input.getValue());
             if (!Objects.equal(result, Entities.UNCHANGED)) {
                 Boolean prevValue = entity.sensors().get(SERVICE_UP);
-                if (!Objects.equal(result, prevValue)) {
+                if (!Objects.equal(result, prevValue) && (prevValue!=null || 
result!=Boolean.TRUE)) {
                     log.debug("Enricher '" + DEFAULT_ENRICHER_UNIQUE_TAG + "' 
for " + entity + " determined service up changed from " + prevValue + " to " + 
result + " due to indicators: " + input);
                 }
             }
@@ -371,39 +403,52 @@ public class ServiceStateLogic {
             }
         }
 
+        transient int recomputeDepth=0;
         protected Maybe<Lifecycle> 
computeActualStateWhenExpectedRunning(SensorEvent<Object> event) {
-            int count=0;
-            while (true) {
-                Map<String, Object> problems = 
entity.getAttribute(SERVICE_PROBLEMS);
-                boolean noProblems = problems == null || problems.isEmpty();
+            if (recomputeDepth>0) {
+                return Maybe.absent("Skipping actual state computation because 
already computing");
+            }
+            try {
+                while (true) {
+                    Map<String, Object> problems = 
entity.getAttribute(SERVICE_PROBLEMS);
+                    boolean noProblems = problems == null || 
problems.isEmpty();
 
-                Boolean serviceUp = entity.getAttribute(SERVICE_UP);
+                    Boolean serviceUp = entity.getAttribute(SERVICE_UP);
 
-                if (Boolean.TRUE.equals(serviceUp) && noProblems) {
-                    return Maybe.of(Lifecycle.RUNNING);
-                } else {
-                    if (!Entities.isManagedActive(entity)) {
-                        return Maybe.absent("entity not managed active");
-                    }
-                    if 
(!Lifecycle.ON_FIRE.equals(entity.getAttribute(SERVICE_STATE_ACTUAL))) {
-                        boolean waitable = count==0;
-                        waitable = waitable && event!=null && 
!Attributes.SERVICE_UP.equals(event.getSensor());
-                        if (waitable) {
-                            // very occasional race here; might want to give a 
grace period if entity has just transitioned; allow children to catch up
-                            // we probably did the wait when expected running, 
but possibly in some cases we don't (seen once, 2024-07, not reproduced)
-                            log.debug("Entity "+entity+" would be on-fire due 
to problems (up="+serviceUp+", problems="+problems+"), will attempt re-check");
-                            
recomputeIfIssueWhenBecomingExpectedRunning("computing actual state", entity, 
Lifecycle.RUNNING);
-                            count++;
-                            continue;
+                    if (Boolean.TRUE.equals(serviceUp) && noProblems) {
+                        return Maybe.of(Lifecycle.RUNNING);
+                    } else {
+                        if (!Entities.isManagedActive(entity)) {
+                            return Maybe.absent("entity not managed active");
+                        }
+                        // with delay when writing expected state, it should 
not be necessary to have a wait/retry
+                        if 
(!Lifecycle.ON_FIRE.equals(entity.getAttribute(SERVICE_STATE_ACTUAL))) {
+                            boolean retryable = recomputeDepth == 0;
+                            // allow recompute if event is null (intermediate 
recomputation?)
+                            // but need to prevent
+                            retryable = retryable && (event == null || 
!Attributes.SERVICE_UP.equals(event.getSensor()));
+                            if (retryable) {
+                                recomputeDepth++;
+                                // occasional race here; might want to give a 
grace period if entity has just transitioned; allow children to catch up;
+                                // we should have done the wait when expected 
running, but possibly it hasn't caught up yet
+                                log.debug("Entity " + entity + " would be 
computed on-fire due to problems (up=" + serviceUp + ", problems=" + problems + 
"), will attempt re-check");
+                                
recomputeWantingNoIssuesWhenBecomingExpectedRunning("computing actual state", 
entity,
+                                        RecomputeWaitMode.SHORT  // NONE would 
probalby be fine here, with none of the recompute above,
+                                        // at least whenever expected state is 
used, due to how it waits now; but leaving it as is until more confirmation
+                                );
+                                continue;
+                            }
                         }
+                        BrooklynLogging.log(log, 
BrooklynLogging.levelDependingIfReadOnly(entity, LoggingLevel.WARN, 
LoggingLevel.TRACE, LoggingLevel.DEBUG),
+                                "Setting " + entity + " " + Lifecycle.ON_FIRE 
+ " due to problems when expected running, " +
+                                        "trigger=" + event + ", " +
+                                        "up=" + serviceUp + ", " +
+                                        (noProblems ? "not-up-indicators: " + 
entity.getAttribute(SERVICE_NOT_UP_INDICATORS) : "problems: " + problems));
+                        return Maybe.of(Lifecycle.ON_FIRE);
                     }
-                    BrooklynLogging.log(log, 
BrooklynLogging.levelDependingIfReadOnly(entity, LoggingLevel.WARN, 
LoggingLevel.TRACE, LoggingLevel.DEBUG),
-                            "Setting " + entity + " " + Lifecycle.ON_FIRE + " 
due to problems when expected running, " +
-                                    "trigger="+event+", "+
-                                    "up=" + serviceUp + ", " +
-                                    (noProblems ? "not-up-indicators: " + 
entity.getAttribute(SERVICE_NOT_UP_INDICATORS) : "problems: " + problems));
-                    return Maybe.of(Lifecycle.ON_FIRE);
                 }
+            } finally {
+                recomputeDepth = 0;
             }
         }
 
@@ -421,7 +466,7 @@ public class ServiceStateLogic {
                     return Maybe.of(Lifecycle.STOPPED);
                 } else {
                     BrooklynLogging.log(log, 
BrooklynLogging.levelDependingIfReadOnly(entity, LoggingLevel.WARN, 
LoggingLevel.TRACE, LoggingLevel.DEBUG),
-                        "Setting "+entity+" "+Lifecycle.ON_FIRE+" due to 
problems when expected "+stateTransition+" / up="+up+": "+problems);
+                        "Computed "+entity+" "+Lifecycle.ON_FIRE+" due to 
problems when expected "+stateTransition+" / up="+up+": "+problems);
                     return Maybe.of(Lifecycle.ON_FIRE);
                 }
             } else {
@@ -549,7 +594,7 @@ public class ServiceStateLogic {
         @SuppressWarnings("serial")
         public static final ConfigKey<Set<Lifecycle>> 
IGNORE_ENTITIES_WITH_THESE_SERVICE_STATES = ConfigKeys.newConfigKey(new 
TypeToken<Set<Lifecycle>>() {},
             
"enricher.service_state.children_and_members.ignore_entities.service_state_values",
-            "Service states (including null) which indicate an entity should 
be ignored when looking at children service states; anything apart from RUNNING 
not in this list will be treated as not healthy (by default just ON_FIRE will 
mean not healthy)",
+            "Service states of children (including null) which indicate they 
should be ignored when looking at children service states; anything apart from 
RUNNING not in this list will be treated as not healthy (by default just 
ON_FIRE will mean not healthy)",
             
MutableSet.<Lifecycle>builder().addAll(Lifecycle.values()).add(null).remove(Lifecycle.RUNNING).remove(Lifecycle.ON_FIRE).build().asUnmodifiable());
 
         protected String getKeyForMapSensor() {
diff --git 
a/launcher/src/test/java/org/apache/brooklyn/launcher/blueprints/ChildrenQuorateRaceTest.java
 
b/launcher/src/test/java/org/apache/brooklyn/launcher/blueprints/ChildrenQuorateRaceTest.java
new file mode 100644
index 0000000000..59e5dcf0a6
--- /dev/null
+++ 
b/launcher/src/test/java/org/apache/brooklyn/launcher/blueprints/ChildrenQuorateRaceTest.java
@@ -0,0 +1,205 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.launcher.blueprints;
+
+import com.google.common.collect.ImmutableSet;
+import org.apache.brooklyn.api.entity.Entity;
+import org.apache.brooklyn.api.entity.EntitySpec;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.config.ConfigKey;
+import org.apache.brooklyn.core.entity.Attributes;
+import org.apache.brooklyn.core.entity.Dumper;
+import org.apache.brooklyn.core.entity.EntityAsserts;
+import org.apache.brooklyn.core.entity.lifecycle.Lifecycle;
+import org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic;
+import org.apache.brooklyn.core.mgmt.EntityManagementUtils;
+import org.apache.brooklyn.core.sensor.Sensors;
+import org.apache.brooklyn.core.workflow.WorkflowBasicTest;
+import org.apache.brooklyn.entity.stock.BasicApplication;
+import org.apache.brooklyn.entity.stock.BasicStartable;
+import org.apache.brooklyn.entity.stock.WorkflowStartable;
+import org.apache.brooklyn.util.collections.MutableList;
+import org.apache.brooklyn.util.collections.MutableMap;
+import org.apache.brooklyn.util.collections.QuorumCheck;
+import org.apache.brooklyn.util.core.task.BasicExecutionManager;
+import org.apache.brooklyn.util.core.task.Tasks;
+import org.apache.brooklyn.util.time.Duration;
+import org.apache.brooklyn.util.time.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+import static 
org.apache.brooklyn.core.entity.lifecycle.ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers.UP_QUORUM_CHECK;
+
+/** This does rebind. See SimpleBlueprintNonRebindTest for an example with 
rebind disabled. */
+public class ChildrenQuorateRaceTest extends AbstractBlueprintTest {
+
+    private static final Logger log = 
LoggerFactory.getLogger(ChildrenQuorateRaceTest.class);
+
+    @Override
+    protected ManagementContext decorateManagementContext(ManagementContext 
mgmt) {
+        mgmt = super.decorateManagementContext(mgmt);
+        // make workflow step types available
+        WorkflowBasicTest.addWorkflowStepTypes(mgmt);
+        return mgmt;
+    }
+
+    @Override
+    protected boolean isViewerEnabled() {
+        return true;
+    }
+
+    @Override
+    protected boolean isUsingNewViewerForRebind() {
+        return true;
+    }
+
+    public Duration randomNormalishJitter(Duration base, Duration jitterMax) {
+        boolean increase = base.isLongerThan(Duration.ZERO) ? Math.random() > 
0.5 : true;
+        if (!increase && jitterMax.isLongerThan(base)) jitterMax = base;
+        Duration jitterActual = jitterMax.multiply(Math.random() * 
Math.random());
+        if (!increase) jitterActual = jitterActual.multiply(-1);
+        jitterActual = Duration.millis(jitterActual.toMilliseconds());
+        return base.add(jitterActual);
+    }
+    public Duration randomNormalishJitter(String base, String jitterMax) {
+        return randomNormalishJitter(Duration.of(base), 
Duration.of(jitterMax));
+    }
+
+    public Duration taskStartStopDelay() {
+        return randomNormalishJitter("100ms", "1s");
+    }
+    public Duration taskWorkflowDelay() {
+        return Duration.of("10s").add(randomNormalishJitter("0", "5s"));
+    }
+
+    @Test(groups="Integration") // because slow
+    public void testRace() throws Exception {
+        BasicApplication app = EntityManagementUtils.createUnstarted(mgmt, 
EntitySpec.create(BasicApplication.class));
+        int N=5;
+        // at N=5 we see: Recompute determined 
BasicStartableImpl{id=k7meoqgkqg} is up, after 17s 391ms
+        for (int i=0; i<N; i++) app.addChild(descendantSpec(N, N));
+
+//        Dumper.dumpInfo(app);
+
+        decorateExecutionWithChaosMonkeySleepAndLog((BasicExecutionManager) 
mgmt.getExecutionManager(),
+                () -> onTaskStart(this::taskStartStopDelay), () -> 
onTaskEnd(this::taskStartStopDelay));
+
+        recurse(app, Entity::getChildren, this::initStrict);
+        app.start(null);
+
+        // stick a breakpoint on the following line (make sure it is 
thread-only, not all-threads!)
+        // then connect a UI eg brooklyn-ui/app-inspector `make dev` to the 
API endpoint used
+        decorateExecutionWithChaosMonkeySleepAndLog((BasicExecutionManager) 
mgmt.getExecutionManager(), null, null);
+        EntityAsserts.assertAttributeEquals(app, 
Attributes.SERVICE_STATE_ACTUAL, Lifecycle.RUNNING);
+    }
+
+    private <T> void recurse(T root, Function<T,? extends Iterable<T>> next, 
Consumer<T> action) {
+        action.accept(root);
+        next.apply(root).forEach(child -> recurse(child, next, action));
+    }
+
+    private void initStrict(Entity target) {
+        if (target.getChildren().isEmpty()) return;
+
+        // some things set this, but not in this test
+//        ServiceStateLogic.ServiceNotUpLogic.updateNotUpIndicator(target, 
Attributes.SERVICE_STATE_ACTUAL,
+//                "created but not yet started, at "+Time.makeDateString());
+
+        ServiceStateLogic.newEnricherFromChildren().checkChildrenAndMembers()
+                .uniqueTag("children-service-up-contraindicators")
+                .requireUpChildren(QuorumCheck.QuorumChecks.allAndAtLeastOne())
+                
.configure(ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers.DERIVE_SERVICE_PROBLEMS,
 false)
+                
.configure(ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers.IGNORE_ENTITIES_WITH_THESE_SERVICE_STATES,
+                        // override _not_ to ignore "starting" children
+                        // (only applies for collections, which shouldn't say 
up unless starting children are up;
+                        // other nodes default to an "alwaysHealthy" quorum so 
won't be affected,
+                        // unless they have a custom up-ness quorum-check, in 
which case their upness will also be a function of starting children;
+                        // means eg restarting a child of a collection will 
make the collection say service down, which is wanted for it;
+                        // ideally this might be configurable for other nodes 
or depending on state of collection,
+                        // or we can tie in with state derivation ... but that 
can be an enhancement for another day)
+                        ImmutableSet.of(
+                                //Lifecycle.STARTING, Lifecycle.STARTED,
+                                //Lifecycle.STOPPING, Lifecycle.STOPPED,
+                                Lifecycle.DESTROYED))
+                .addTo(target);
+
+        ServiceStateLogic.newEnricherFromChildren().checkChildrenAndMembers()
+                .uniqueTag("children-service-problems-indicators")
+                
.requireRunningChildren(QuorumCheck.QuorumChecks.allAndAtLeastOne())
+                
.configure(ServiceStateLogic.ComputeServiceIndicatorsFromChildrenAndMembers.DERIVE_SERVICE_NOT_UP,
 false)
+                .addTo(target);
+
+        
target.enrichers().add(ServiceStateLogic.newEnricherForServiceStateFromProblemsAndUp());
+        
target.enrichers().add(ServiceStateLogic.ServiceNotUpLogic.newEnricherForServiceUpIfNotUpIndicatorsEmpty());
+    }
+
+    private EntitySpec descendantSpec(int... sizes) {
+        if (sizes.length==0) return
+                EntitySpec.create(WorkflowStartable.class)
+                        .configure((ConfigKey<Object>)(ConfigKey) 
WorkflowStartable.START_WORKFLOW, MutableMap.of(
+                                "steps", MutableList.of("sleep 
"+taskWorkflowDelay().toMilliseconds()+"ms")))
+                        .configure((ConfigKey<Object>)(ConfigKey) 
WorkflowStartable.STOP_WORKFLOW, MutableMap.of(
+                                "steps", MutableList.of("let x = 0")));
+        EntitySpec<BasicStartable> result = 
EntitySpec.create(BasicStartable.class);
+        int[] nextSizes = new int[sizes.length-1];
+        for (int i=1; i<sizes.length; i++) nextSizes[i-1] = sizes[i];
+        for (int i=0; i<sizes[0]; i++) result.child(descendantSpec(nextSizes));
+        return result;
+    }
+
+    private void 
decorateExecutionWithChaosMonkeySleepAndLog(BasicExecutionManager ctx, Runnable 
onTaskStart, Runnable onTaskEnd) {
+        
ctx.getAutoFlagsLive().put(BasicExecutionManager.TASK_START_CALLBACK_TAG, 
onTaskStart);
+        
ctx.getAutoFlagsLive().put(BasicExecutionManager.TASK_END_CALLBACK_TAG, 
onTaskEnd);
+    }
+
+    private boolean includeForTaskDecoration(Task t) {
+        if (t==null) return false;
+        if (!Tasks.isNonProxyTask(t)) return false;
+        if (t.getDisplayName().equals("periodic-persister"))
+            return false;
+        return true;
+    }
+
+    private void onTaskStart(Supplier<Duration> delay) {
+        Task<?> current = 
BasicExecutionManager.getPerThreadCurrentTask().get();
+        if (includeForTaskDecoration(current)) {
+            // above necessary to prevent this running from
+            Duration d = delay.get();
+//            log.info("TASK " + current+" delay "+d+" before start");
+            Time.sleep(d);
+//            log.info("TASK " + current+" start");
+        }
+    }
+    private void onTaskEnd(Supplier<Duration> delay) {
+        Task<?> current = 
BasicExecutionManager.getPerThreadCurrentTask().get();
+        if (includeForTaskDecoration(current)) {
+            // above necessary to prevent this running from
+//            log.info("TASK " + current+" delay before end");
+            Time.sleep(delay.get());
+//            log.info("TASK " + current+" end");
+        }
+    }
+
+}

Reply via email to