This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit ea6cc204e8290b438d197b3383e3e2d77eada9e1
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue Jan 21 23:28:02 2025 +0000

    Add execution mgr support for global tags, including newTaskStartCallback 
tag
    
    and TaskEndCallback, using context "Auto-flags" set automatically on tasks
---
 .../util/core/task/BasicExecutionContext.java      | 26 +++++----
 .../util/core/task/BasicExecutionManager.java      | 49 ++++++++++++++---
 .../org/apache/brooklyn/util/core/task/Tasks.java  | 16 ++++--
 .../util/core/task/AutoFlagsCallbackTest.java      | 63 ++++++++++++++++++++++
 4 files changed, 130 insertions(+), 24 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index ff24b6fe2f..d61c0416f2 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -19,15 +19,7 @@
 package org.apache.brooklyn.util.core.task;
 
 import java.lang.reflect.Proxy;
-import java.util.ArrayDeque;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Deque;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -39,6 +31,7 @@ import java.util.concurrent.TimeoutException;
 import java.util.stream.Collectors;
 
 import com.google.common.annotations.Beta;
+import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Function;
 import com.google.common.base.Supplier;
 import com.google.common.collect.Iterables;
@@ -355,7 +348,7 @@ public class BasicExecutionContext extends 
AbstractExecutionContext {
     @SuppressWarnings({ "unchecked", "rawtypes" })
     @Override
     protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object 
task) {
-        if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>)) 
+        if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>))
             return submitInternal(propertiesQ, 
((TaskAdaptable<?>)task).asTask());
         
         Map properties = MutableMap.copyOf(propertiesQ);
@@ -394,8 +387,8 @@ public class BasicExecutionContext extends 
AbstractExecutionContext {
             ((ScheduledTask)task).executionContext = this;
 
         } else {
-            final Object startCallback = 
properties.get("newTaskStartCallback");
-            properties.put("newTaskStartCallback", new Function<Task<?>, 
Void>() {
+            final Object startCallback = 
properties.get(BasicExecutionManager.TASK_START_CALLBACK_TAG);
+            properties.put(BasicExecutionManager.TASK_START_CALLBACK_TAG, new 
Function<Task<?>, Void>() {
                 @Override
                 public Void apply(Task<?> it) {
                     registerPerThreadExecutionContext();
@@ -404,8 +397,8 @@ public class BasicExecutionContext extends 
AbstractExecutionContext {
                 }
             });
 
-            final Object endCallback = properties.get("newTaskEndCallback");
-            properties.put("newTaskEndCallback", new Function<Task<?>, Void>() 
{
+            final Object endCallback = 
properties.get(BasicExecutionManager.TASK_END_CALLBACK_TAG);
+            properties.put(BasicExecutionManager.TASK_END_CALLBACK_TAG, new 
Function<Task<?>, Void>() {
                 @Override
                 public Void apply(Task<?> it) {
                     try {
@@ -601,4 +594,9 @@ public class BasicExecutionContext extends 
AbstractExecutionContext {
     public String toString() {
         return getClass().getSimpleName()+tags.stream().filter(t -> !(t 
instanceof ManagementContext)).collect(Collectors.toList());
     }
+
+    @VisibleForTesting
+    public Set<Object> getTagsLive() {
+        return tags;
+    }
 }
diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
index ead13412a8..7e522010ab 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java
@@ -122,6 +122,11 @@ public class BasicExecutionManager implements 
ExecutionManager {
         return PerThreadCurrentTaskHolder.perThreadCurrentTask;
     }
 
+    /** See {@link BasicExecutionManager#getAutoFlagsLive()} */
+    @VisibleForTesting @Beta public static final String 
TASK_START_CALLBACK_TAG = "newTaskStartCallback";
+    /** See {@link BasicExecutionManager#getAutoFlagsLive()} */
+    @VisibleForTesting @Beta public static final String TASK_END_CALLBACK_TAG 
= "newTaskEndCallback";
+
     /**
      * task names in this list will be print only in the trace level
      */
@@ -715,12 +720,33 @@ public class BasicExecutionManager implements 
ExecutionManager {
     }
 
     @Override
-    public <T> Task<T> submit(Map<?, ?> flags, TaskAdaptable<T> task) {
-        if (!(task instanceof Task))
-            task = task.asTask();
+    public <T> Task<T> submit(Map<?, ?> flags, TaskAdaptable<T> taskA) {
+        final Task<T> task = taskA instanceof Task ? (Task<T>) taskA : 
taskA.asTask();
+
+        if (!autoFlags.isEmpty()) {
+            MutableMap p2 = MutableMap.copyOf(flags);
+            autoFlags.forEach((k,v)->{
+                if (!p2.containsKey(k)) {
+                    p2.put(k, (Runnable) 
()->BasicExecutionManager.invokeCallback(v, (Task) task));
+                } else if (!Objects.equals(v, p2.get(k))) {
+                    // will usually have these from BEC to register per-thread
+                    if 
(BasicExecutionManager.TASK_START_CALLBACK_TAG.equals(k) || 
BasicExecutionManager.TASK_END_CALLBACK_TAG.equals(k)) {
+                        Object v2 = p2.get(k);
+                        p2.put(k, (Runnable) ()->{
+                            BasicExecutionManager.invokeCallback(v, (Task) 
task);
+                            BasicExecutionManager.invokeCallback(v2, (Task) 
task);
+                        });
+                    } else {
+                        throw new IllegalStateException("Cannot have autoFlags 
and task-specific flags with the same unexpected key '"+k+"': "+task);
+                    }
+                }
+            });
+            flags = p2;
+        }
+
         synchronized (task) {
             if (((TaskInternal<?>) task).getInternalFuture() != null) return 
(Task<T>) task;
-            return submitNewTask(flags, (Task<T>) task);
+            return submitNewTask(flags, task);
         }
     }
 
@@ -1219,7 +1245,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
             jitterThreadStart(task);
         }
         if (flags != null && !startingThisThreadMightEndElsewhere) {
-            invokeCallback(flags.get("newTaskStartCallback"), task);
+            invokeCallback(flags.get(TASK_START_CALLBACK_TAG), task);
         }
     }
 
@@ -1342,7 +1368,7 @@ public class BasicExecutionManager implements 
ExecutionManager {
             }
 
             if (flags != null && taskWasSubmittedAndNotYetEnded && 
startedGuaranteedToEndInSameThreadAndEndingSameThread) {
-                invokeCallback(flags.get("newTaskEndCallback"), task);
+                invokeCallback(flags.get(TASK_END_CALLBACK_TAG), task);
             }
             if (task.getEndTimeUtc() > 0) {
                 if (taskWasSubmittedAndNotYetEnded) {
@@ -1494,4 +1520,15 @@ public class BasicExecutionManager implements 
ExecutionManager {
         log.info("Setting task startup jittering maximum delay to " + 
jitterThreadsMaxDelay);
     }
 
+
+    final Map autoFlags = MutableMap.of();
+    /** Autoflags are tags which are automatically applied to tasks in this 
execution context.
+     *  Intended for use during testing, such as inserting arbitrarily delays.
+     *  Not guaranteed to work in all situations (but plenty for inserting 
chaos monkey delays).
+     *  Often used with {@link BasicExecutionManager#TASK_START_CALLBACK_TAG} 
and {@link BasicExecutionManager#TASK_END_CALLBACK_TAG}.
+     */
+    @VisibleForTesting @Beta
+    public Map getAutoFlagsLive() {
+        return autoFlags;
+    }
 }
diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java 
b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
index 39765c85cd..5442d31c0c 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java
@@ -44,10 +44,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.PrintWriter;
 import java.io.Writer;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.ExecutionException;
@@ -126,6 +123,17 @@ public class Tasks {
         if (proxy==null || proxy.equals(task)) return task;
         return getFinalProxyTarget(proxy);
     }
+    /** true if this is a task, and it is not a proxy task, i.e. it equals its 
{@link TaskInternal#getProxyTarget()}.
+     * (proxy tasks are wrappers and other things which used by framework 
rather than a real user-supplied task.)  */
+    public static boolean isNonProxyTask(Task<?> task) {
+        if (task==null) return false;
+        return Objects.equals(task, getFinalProxyTarget(task));
+    }
+    /** as {@link #isNonProxyTask(Task)} for the actual task in the current 
thread
+     * (using {@link BasicExecutionManager#getPerThreadCurrentTask()} not 
{@link Tasks#current()} which follows proxies */
+    public static boolean isNonProxyTask() {
+        return 
isNonProxyTask(BasicExecutionManager.getPerThreadCurrentTask().get());
+    }
     
     /** creates a {@link ValueResolver} instance which allows significantly 
more customization than
      * the various {@link #resolveValue(Object, Class, ExecutionContext)} 
methods here */
diff --git 
a/core/src/test/java/org/apache/brooklyn/util/core/task/AutoFlagsCallbackTest.java
 
b/core/src/test/java/org/apache/brooklyn/util/core/task/AutoFlagsCallbackTest.java
new file mode 100644
index 0000000000..1064b545d8
--- /dev/null
+++ 
b/core/src/test/java/org/apache/brooklyn/util/core/task/AutoFlagsCallbackTest.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.util.core.task;
+
+import org.apache.brooklyn.core.entity.Entities;
+import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport;
+import org.apache.brooklyn.test.Asserts;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.atomic.AtomicReference;
+
+public class AutoFlagsCallbackTest extends BrooklynAppUnitTestSupport {
+
+    private static final Logger log = 
LoggerFactory.getLogger(AutoFlagsCallbackTest.class);
+
+    @Test
+    public void testCalledInOrder() {
+        AtomicReference<String> state = new AtomicReference<>("end");
+        ((BasicExecutionManager) 
mgmt.getExecutionManager()).getAutoFlagsLive().put(BasicExecutionManager.TASK_START_CALLBACK_TAG,
 (Runnable) () -> {
+            if (Tasks.isNonProxyTask()) {
+                // above necessary to prevent this running from
+                log.info("STARTING TASK: " + Tasks.current());
+                Asserts.assertTrue(state.compareAndSet("end", "start"));
+            } else {
+                // wrapper task eg DST
+            }
+        });
+        ((BasicExecutionManager) 
mgmt.getExecutionManager()).getAutoFlagsLive().put(BasicExecutionManager.TASK_END_CALLBACK_TAG,
 (Runnable) () -> {
+            if (Tasks.isNonProxyTask()) {
+                state.compareAndSet("start-callback", "main");
+                log.info("ENDING TASK: " + Tasks.current());
+                Asserts.assertTrue(state.compareAndSet("start", "end"));
+            } else {
+                // wrapper task eg DST
+            }
+        });
+        Entities.submit(app, Tasks.create("test1", () -> {
+            log.info("running test 1" + " / " + Tasks.current().getId());
+            Asserts.assertEquals(state.get(), "start");
+        })).getUnchecked();
+
+        Asserts.assertEquals(state.get(), "end");
+    }
+
+}

Reply via email to