This is an automated email from the ASF dual-hosted git repository.
heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
The following commit(s) were added to refs/heads/master by this push:
new 2327001553 allow checking call stacks across tasks
2327001553 is described below
commit 2327001553fef6e61a5c3a2243d106bb50a08820
Author: Alex Heneveld <[email protected]>
AuthorDate: Wed Apr 10 16:03:29 2024 +0100
allow checking call stacks across tasks
so recursive checks between threads are found.
and improve error messages.
---
.../workflow/WorkflowExpressionResolution.java | 8 +--
.../util/core/task/BasicExecutionContext.java | 4 +-
.../util/core/task/CrossTaskThreadLocalStack.java | 71 ++++++++++++++++++++++
.../brooklyn/util/core/task/ValueResolver.java | 2 +-
.../brooklyn/util/core/text/TemplateProcessor.java | 3 +-
.../util/collections/ThreadLocalStack.java | 56 ++++++++++-------
6 files changed, 116 insertions(+), 28 deletions(-)
diff --git
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
index 658aeb1b4d..9831f2dccb 100644
---
a/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
+++
b/core/src/main/java/org/apache/brooklyn/core/workflow/WorkflowExpressionResolution.java
@@ -48,10 +48,10 @@ import org.apache.brooklyn.core.typereg.RegisteredTypes;
import org.apache.brooklyn.util.collections.Jsonya;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
-import org.apache.brooklyn.util.collections.ThreadLocalStack;
import org.apache.brooklyn.util.core.flags.TypeCoercions;
import
org.apache.brooklyn.util.core.predicates.ResolutionFailureTreatedAsAbsent;
import org.apache.brooklyn.util.core.task.DeferredSupplier;
+import org.apache.brooklyn.util.core.task.CrossTaskThreadLocalStack;
import org.apache.brooklyn.util.core.task.Tasks;
import org.apache.brooklyn.util.core.text.TemplateProcessor;
import org.apache.brooklyn.util.exceptions.Exceptions;
@@ -530,7 +530,7 @@ public class WorkflowExpressionResolution {
callable);
}
- static ThreadLocalStack<WorkflowResolutionStackEntry> RESOLVE_STACK = new
ThreadLocalStack<>(false);
+ static CrossTaskThreadLocalStack<WorkflowResolutionStackEntry>
RESOLVE_STACK = new CrossTaskThreadLocalStack<>(false);
<T> T inResolveStackEntry(String callPointUid, Object expression,
Supplier<T> code) {
return inResolveStackEntry(WorkflowResolutionStackEntry.of(this,
callPointUid, expression), null, code);
@@ -587,11 +587,11 @@ public class WorkflowExpressionResolution {
public Object processTemplateExpression(Object expression,
AllowBrooklynDslMode allowBrooklynDsl) {
return inResolveStackEntry(WorkflowResolutionStackEntry.of(this,
"process-template-expression", expression), () -> {
- throw new WorkflowVariableRecursiveReference("Recursive reference:
" + RESOLVE_STACK.getAll(false).stream().map(p -> "" +
p.expression).collect(Collectors.joining("->")));
+ throw new WorkflowVariableRecursiveReference("Recursive reference:
" + RESOLVE_STACK.stream().map(p -> "" +
p.expression).collect(Collectors.joining("->")));
}, () -> {
try {
if (RESOLVE_STACK.size() > 100) {
- throw new WorkflowVariableRecursiveReference("Reference
exceeded max depth 100: " + RESOLVE_STACK.getAll(false).stream().map(p -> "" +
p.expression).collect(Collectors.joining("->")));
+ throw new WorkflowVariableRecursiveReference("Reference
exceeded max depth 100: " + RESOLVE_STACK.stream().map(p -> "" +
p.expression).collect(Collectors.joining("->")));
}
Object result;
diff --git
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
index b0757bacc8..fcbaf627ab 100644
---
a/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
+++
b/core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
@@ -36,6 +36,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.stream.Collectors;
import com.google.common.annotations.Beta;
import com.google.common.base.Function;
@@ -45,6 +46,7 @@ import org.apache.brooklyn.api.entity.Entity;
import org.apache.brooklyn.api.mgmt.ExecutionContext;
import org.apache.brooklyn.api.mgmt.ExecutionManager;
import org.apache.brooklyn.api.mgmt.HasTaskChildren;
+import org.apache.brooklyn.api.mgmt.ManagementContext;
import org.apache.brooklyn.api.mgmt.Task;
import org.apache.brooklyn.api.mgmt.TaskAdaptable;
import org.apache.brooklyn.api.mgmt.entitlement.EntitlementContext;
@@ -589,6 +591,6 @@ public class BasicExecutionContext extends
AbstractExecutionContext {
@Override
public String toString() {
- return super.toString()+"("+tags+")";
+ return getClass().getSimpleName()+tags.stream().filter(t -> !(t
instanceof ManagementContext)).collect(Collectors.toList());
}
}
diff --git
a/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java
b/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java
new file mode 100644
index 0000000000..8421395d73
--- /dev/null
+++
b/core/src/main/java/org/apache/brooklyn/util/core/task/CrossTaskThreadLocalStack.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.util.core.task;
+
+import java.util.Collection;
+import java.util.WeakHashMap;
+import java.util.stream.Stream;
+
+import com.google.common.collect.Streams;
+import org.apache.brooklyn.api.mgmt.Task;
+import org.apache.brooklyn.util.collections.ThreadLocalStack;
+
+public class CrossTaskThreadLocalStack<T> extends ThreadLocalStack<T> {
+
+ public CrossTaskThreadLocalStack(boolean acceptDuplicates) {
+ super(acceptDuplicates);
+ }
+ public CrossTaskThreadLocalStack() { super(); }
+
+ // override since we cannot access another thread's thread local
+ final WeakHashMap<Thread,Collection<T>> backingOverride = new
WeakHashMap<>();
+
+ @Override protected Collection<T> get() {
+ return get(Thread.currentThread());
+ }
+ protected Collection<T> get(Thread t) {
+ synchronized (backingOverride) { return backingOverride.get(t); }
+ }
+ @Override protected void set(Collection<T> value) {
+ synchronized (backingOverride) {
backingOverride.put(Thread.currentThread(), value); }
+ }
+ @Override protected void remove() {
+ synchronized (backingOverride) {
backingOverride.remove(Thread.currentThread()); }
+ }
+ @Override protected Collection<T> getCopyReversed() {
+ return getCopyReversed(Thread.currentThread());
+ }
+ protected Collection<T> getCopyReversed(Thread t) {
+ synchronized (backingOverride) { return copyReversed(get(t)); }
+ }
+
+ public Stream<T> stream() {
+ return concatSubmitterTaskThreadStacks(getCopyReversed().stream(),
Tasks.current());
+ }
+
+ protected Stream<T> concatSubmitterTaskThreadStacks(Stream<T> stream, Task
current) {
+ if (current==null) return stream;
+ Task submitter = current.getSubmittedByTask();
+ if (submitter==null) return stream;
+ Collection<T> ss = getCopyReversed(submitter.getThread());
+ if (ss!=null && !ss.isEmpty()) stream = Streams.concat(stream,
ss.stream());
+ return concatSubmitterTaskThreadStacks(stream, submitter);
+ }
+
+}
diff --git
a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
index 03e9ede5cb..446f8d70b6 100644
--- a/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
+++ b/core/src/main/java/org/apache/brooklyn/util/core/task/ValueResolver.java
@@ -688,7 +688,7 @@ public class ValueResolver<T> implements
DeferredSupplier<T>, Iterable<Maybe<Obj
} catch (Exception e) {
Exceptions.propagateIfFatal(e);
- String msg = "Error resolving "+(description!=null ?
description+", " : "")+v+", in "+exec;
+ String msg = (description!=null ? "Error in resolution:
"+description+"," : "Error resolving value") + " at "+exec;
String eTxt = Exceptions.collapseText(e);
IllegalArgumentException problem = eTxt.startsWith(msg) ? new
IllegalArgumentException(e) : new IllegalArgumentException(msg+": "+eTxt, e);
if (swallowExceptions) {
diff --git
a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
index af00cbce16..3777c322f4 100644
---
a/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
+++
b/core/src/main/java/org/apache/brooklyn/util/core/text/TemplateProcessor.java
@@ -49,6 +49,7 @@ import
org.apache.brooklyn.core.workflow.WorkflowExpressionResolution;
import org.apache.brooklyn.util.collections.MutableList;
import org.apache.brooklyn.util.collections.MutableMap;
import org.apache.brooklyn.util.collections.ThreadLocalStack;
+import org.apache.brooklyn.util.core.task.CrossTaskThreadLocalStack;
import org.apache.brooklyn.util.exceptions.Exceptions;
import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
import org.apache.brooklyn.util.guava.Maybe;
@@ -95,7 +96,7 @@ public class TemplateProcessor {
public static void closeLocalTemplateModelCache() {
TEMPLATE_MODEL_UNWRAP_CACHE.pop(); }
static ThreadLocalStack<String> TEMPLATE_FILE_WANTING_LEGACY_SYNTAX = new
ThreadLocalStack<>();
- static ThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new
ThreadLocalStack<>();
+ static CrossTaskThreadLocalStack<Boolean> IS_FOR_WORKFLOW = new
CrossTaskThreadLocalStack<>();
public interface UnwrappableTemplateModel {
Maybe<Object> unwrap();
diff --git
a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
index f9a3f90479..4170dbcf8d 100644
---
a/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
+++
b/utils/common/src/main/java/org/apache/brooklyn/util/collections/ThreadLocalStack.java
@@ -24,6 +24,7 @@ import org.apache.brooklyn.util.guava.Maybe;
import java.util.Collection;
import java.util.Collections;
import java.util.Iterator;
+import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;
@@ -37,28 +38,38 @@ public class ThreadLocalStack<T> implements Iterable<T> {
}
public ThreadLocalStack() { this.acceptDuplicates = true; }
- final ThreadLocal<Collection<T>> set = new ThreadLocal<>();
+ protected final ThreadLocal<Collection<T>> backing = new ThreadLocal<>();
- public Collection<T> getAll(boolean forceInitialized) {
- Collection<T> result = set.get();
- if (forceInitialized && result==null) {
+ protected Collection<T> get() {
+ return backing.get();
+ }
+ protected void set(Collection<T> value) {
+ backing.set(value);
+ }
+ protected void remove() {
+ backing.remove();
+ }
+
+ protected Collection<T> upsert() {
+ Collection<T> result = get();
+ if (result==null) {
result = acceptDuplicates ? MutableList.of() : MutableSet.of();
- set.set(result);
+ set(result);
}
return result;
}
public T pop() {
- Collection<T> resultS = getAll(true);
+ Collection<T> resultS = upsert();
T last = Iterables.getLast(resultS);
resultS.remove(last);
- if (resultS.isEmpty()) set.remove();
+ if (resultS.isEmpty()) remove();
return last;
}
/** returns true unless duplicates are not accepted, in which case it
returns false iff the object supplied is equal to one already present */
public boolean push(T object) {
- return getAll(true).add(object);
+ return upsert().add(object);
}
/** top of stack first */
@@ -69,23 +80,28 @@ public class ThreadLocalStack<T> implements Iterable<T> {
/** top of stack first */
public Stream<T> stream() {
- MutableList<T> l = MutableList.copyOf(getAll(false));
+ return getCopyReversed().stream();
+ }
+ protected Collection<T> getCopyReversed() {
+ return copyReversed(get());
+ }
+ protected Collection<T> copyReversed(Collection<T> c1) {
+ List<T> l = MutableList.copyOf(c1);
Collections.reverse(l);
- return l.stream();
+ return l;
}
public Maybe<T> peek() {
- Collection<T> resultS = getAll(false);
- if (resultS==null || resultS.isEmpty()) return Maybe.absent("Nothing
in local stack");
- return Maybe.of( Iterables.getLast(resultS) );
+ Iterator<T> si = stream().iterator();
+ if (!si.hasNext()) return Maybe.absent("Nothing in local stack");
+ return Maybe.of( si.next() );
}
public Maybe<T> peekPenultimate() {
- Collection<T> resultS = getAll(false);
- if (resultS==null) return Maybe.absent();
- int size = resultS.size();
- if (size<=1) return Maybe.absent();
- return Maybe.of( Iterables.get(resultS, size-2) );
+ Iterator<T> si = stream().iterator();
+ if (si.hasNext()) si.next();
+ if (!si.hasNext()) return Maybe.absent();
+ return Maybe.of( si.next() );
}
public void pop(T entry) {
@@ -96,8 +112,6 @@ public class ThreadLocalStack<T> implements Iterable<T> {
}
public int size() {
- Collection<T> v = getAll(false);
- if (v==null) return 0;
- return v.size();
+ return (int) stream().count();
}
}