This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit ea6cc204e8290b438d197b3383e3e2d77eada9e1 Author: Alex Heneveld <[email protected]> AuthorDate: Tue Jan 21 23:28:02 2025 +0000 Add execution mgr support for global tags, including newTaskStartCallback tag and TaskEndCallback, using context "Auto-flags" set automatically on tasks --- .../util/core/task/BasicExecutionContext.java | 26 +++++---- .../util/core/task/BasicExecutionManager.java | 49 ++++++++++++++--- .../org/apache/brooklyn/util/core/task/Tasks.java | 16 ++++-- .../util/core/task/AutoFlagsCallbackTest.java | 63 ++++++++++++++++++++++ 4 files changed, 130 insertions(+), 24 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java index ff24b6fe2f..d61c0416f2 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java @@ -19,15 +19,7 @@ package org.apache.brooklyn.util.core.task; import java.lang.reflect.Proxy; -import java.util.ArrayDeque; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.Deque; -import java.util.LinkedHashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -39,6 +31,7 @@ import java.util.concurrent.TimeoutException; import java.util.stream.Collectors; import com.google.common.annotations.Beta; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Function; import com.google.common.base.Supplier; import com.google.common.collect.Iterables; @@ -355,7 +348,7 @@ public class BasicExecutionContext extends AbstractExecutionContext { @SuppressWarnings({ "unchecked", "rawtypes" }) @Override protected <T> Task<T> submitInternal(Map<?,?> propertiesQ, final Object task) { - if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>)) + if (task instanceof TaskAdaptable<?> && !(task instanceof Task<?>)) return submitInternal(propertiesQ, ((TaskAdaptable<?>)task).asTask()); Map properties = MutableMap.copyOf(propertiesQ); @@ -394,8 +387,8 @@ public class BasicExecutionContext extends AbstractExecutionContext { ((ScheduledTask)task).executionContext = this; } else { - final Object startCallback = properties.get("newTaskStartCallback"); - properties.put("newTaskStartCallback", new Function<Task<?>, Void>() { + final Object startCallback = properties.get(BasicExecutionManager.TASK_START_CALLBACK_TAG); + properties.put(BasicExecutionManager.TASK_START_CALLBACK_TAG, new Function<Task<?>, Void>() { @Override public Void apply(Task<?> it) { registerPerThreadExecutionContext(); @@ -404,8 +397,8 @@ public class BasicExecutionContext extends AbstractExecutionContext { } }); - final Object endCallback = properties.get("newTaskEndCallback"); - properties.put("newTaskEndCallback", new Function<Task<?>, Void>() { + final Object endCallback = properties.get(BasicExecutionManager.TASK_END_CALLBACK_TAG); + properties.put(BasicExecutionManager.TASK_END_CALLBACK_TAG, new Function<Task<?>, Void>() { @Override public Void apply(Task<?> it) { try { @@ -601,4 +594,9 @@ public class BasicExecutionContext extends AbstractExecutionContext { public String toString() { return getClass().getSimpleName()+tags.stream().filter(t -> !(t instanceof ManagementContext)).collect(Collectors.toList()); } + + @VisibleForTesting + public Set<Object> getTagsLive() { + return tags; + } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java index ead13412a8..7e522010ab 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionManager.java @@ -122,6 +122,11 @@ public class BasicExecutionManager implements ExecutionManager { return PerThreadCurrentTaskHolder.perThreadCurrentTask; } + /** See {@link BasicExecutionManager#getAutoFlagsLive()} */ + @VisibleForTesting @Beta public static final String TASK_START_CALLBACK_TAG = "newTaskStartCallback"; + /** See {@link BasicExecutionManager#getAutoFlagsLive()} */ + @VisibleForTesting @Beta public static final String TASK_END_CALLBACK_TAG = "newTaskEndCallback"; + /** * task names in this list will be print only in the trace level */ @@ -715,12 +720,33 @@ public class BasicExecutionManager implements ExecutionManager { } @Override - public <T> Task<T> submit(Map<?, ?> flags, TaskAdaptable<T> task) { - if (!(task instanceof Task)) - task = task.asTask(); + public <T> Task<T> submit(Map<?, ?> flags, TaskAdaptable<T> taskA) { + final Task<T> task = taskA instanceof Task ? (Task<T>) taskA : taskA.asTask(); + + if (!autoFlags.isEmpty()) { + MutableMap p2 = MutableMap.copyOf(flags); + autoFlags.forEach((k,v)->{ + if (!p2.containsKey(k)) { + p2.put(k, (Runnable) ()->BasicExecutionManager.invokeCallback(v, (Task) task)); + } else if (!Objects.equals(v, p2.get(k))) { + // will usually have these from BEC to register per-thread + if (BasicExecutionManager.TASK_START_CALLBACK_TAG.equals(k) || BasicExecutionManager.TASK_END_CALLBACK_TAG.equals(k)) { + Object v2 = p2.get(k); + p2.put(k, (Runnable) ()->{ + BasicExecutionManager.invokeCallback(v, (Task) task); + BasicExecutionManager.invokeCallback(v2, (Task) task); + }); + } else { + throw new IllegalStateException("Cannot have autoFlags and task-specific flags with the same unexpected key '"+k+"': "+task); + } + } + }); + flags = p2; + } + synchronized (task) { if (((TaskInternal<?>) task).getInternalFuture() != null) return (Task<T>) task; - return submitNewTask(flags, (Task<T>) task); + return submitNewTask(flags, task); } } @@ -1219,7 +1245,7 @@ public class BasicExecutionManager implements ExecutionManager { jitterThreadStart(task); } if (flags != null && !startingThisThreadMightEndElsewhere) { - invokeCallback(flags.get("newTaskStartCallback"), task); + invokeCallback(flags.get(TASK_START_CALLBACK_TAG), task); } } @@ -1342,7 +1368,7 @@ public class BasicExecutionManager implements ExecutionManager { } if (flags != null && taskWasSubmittedAndNotYetEnded && startedGuaranteedToEndInSameThreadAndEndingSameThread) { - invokeCallback(flags.get("newTaskEndCallback"), task); + invokeCallback(flags.get(TASK_END_CALLBACK_TAG), task); } if (task.getEndTimeUtc() > 0) { if (taskWasSubmittedAndNotYetEnded) { @@ -1494,4 +1520,15 @@ public class BasicExecutionManager implements ExecutionManager { log.info("Setting task startup jittering maximum delay to " + jitterThreadsMaxDelay); } + + final Map autoFlags = MutableMap.of(); + /** Autoflags are tags which are automatically applied to tasks in this execution context. + * Intended for use during testing, such as inserting arbitrarily delays. + * Not guaranteed to work in all situations (but plenty for inserting chaos monkey delays). + * Often used with {@link BasicExecutionManager#TASK_START_CALLBACK_TAG} and {@link BasicExecutionManager#TASK_END_CALLBACK_TAG}. + */ + @VisibleForTesting @Beta + public Map getAutoFlagsLive() { + return autoFlags; + } } diff --git a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java index 39765c85cd..5442d31c0c 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/task/Tasks.java @@ -44,10 +44,7 @@ import javax.annotation.Nullable; import java.io.IOException; import java.io.PrintWriter; import java.io.Writer; -import java.util.Collection; -import java.util.Collections; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.Callable; import java.util.concurrent.CancellationException; import java.util.concurrent.ExecutionException; @@ -126,6 +123,17 @@ public class Tasks { if (proxy==null || proxy.equals(task)) return task; return getFinalProxyTarget(proxy); } + /** true if this is a task, and it is not a proxy task, i.e. it equals its {@link TaskInternal#getProxyTarget()}. + * (proxy tasks are wrappers and other things which used by framework rather than a real user-supplied task.) */ + public static boolean isNonProxyTask(Task<?> task) { + if (task==null) return false; + return Objects.equals(task, getFinalProxyTarget(task)); + } + /** as {@link #isNonProxyTask(Task)} for the actual task in the current thread + * (using {@link BasicExecutionManager#getPerThreadCurrentTask()} not {@link Tasks#current()} which follows proxies */ + public static boolean isNonProxyTask() { + return isNonProxyTask(BasicExecutionManager.getPerThreadCurrentTask().get()); + } /** creates a {@link ValueResolver} instance which allows significantly more customization than * the various {@link #resolveValue(Object, Class, ExecutionContext)} methods here */ diff --git a/core/src/test/java/org/apache/brooklyn/util/core/task/AutoFlagsCallbackTest.java b/core/src/test/java/org/apache/brooklyn/util/core/task/AutoFlagsCallbackTest.java new file mode 100644 index 0000000000..1064b545d8 --- /dev/null +++ b/core/src/test/java/org/apache/brooklyn/util/core/task/AutoFlagsCallbackTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.util.core.task; + +import org.apache.brooklyn.core.entity.Entities; +import org.apache.brooklyn.core.test.BrooklynAppUnitTestSupport; +import org.apache.brooklyn.test.Asserts; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.annotations.Test; + +import java.util.concurrent.atomic.AtomicReference; + +public class AutoFlagsCallbackTest extends BrooklynAppUnitTestSupport { + + private static final Logger log = LoggerFactory.getLogger(AutoFlagsCallbackTest.class); + + @Test + public void testCalledInOrder() { + AtomicReference<String> state = new AtomicReference<>("end"); + ((BasicExecutionManager) mgmt.getExecutionManager()).getAutoFlagsLive().put(BasicExecutionManager.TASK_START_CALLBACK_TAG, (Runnable) () -> { + if (Tasks.isNonProxyTask()) { + // above necessary to prevent this running from + log.info("STARTING TASK: " + Tasks.current()); + Asserts.assertTrue(state.compareAndSet("end", "start")); + } else { + // wrapper task eg DST + } + }); + ((BasicExecutionManager) mgmt.getExecutionManager()).getAutoFlagsLive().put(BasicExecutionManager.TASK_END_CALLBACK_TAG, (Runnable) () -> { + if (Tasks.isNonProxyTask()) { + state.compareAndSet("start-callback", "main"); + log.info("ENDING TASK: " + Tasks.current()); + Asserts.assertTrue(state.compareAndSet("start", "end")); + } else { + // wrapper task eg DST + } + }); + Entities.submit(app, Tasks.create("test1", () -> { + log.info("running test 1" + " / " + Tasks.current().getId()); + Asserts.assertEquals(state.get(), "start"); + })).getUnchecked(); + + Asserts.assertEquals(state.get(), "end"); + } + +}
