Github user aledsage commented on a diff in the pull request:
https://github.com/apache/brooklyn-server/pull/816#discussion_r141578971
--- Diff:
core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java
---
@@ -110,48 +248,53 @@ public ExecutionManager getExecutionManager() {
@SuppressWarnings("unchecked")
@Override
public <T> Maybe<T> getImmediately(Object callableOrSupplier) {
- BasicTask<?> fakeTaskForContext;
+ BasicTask<T> fakeTaskForContext;
if (callableOrSupplier instanceof BasicTask) {
- fakeTaskForContext = (BasicTask<?>)callableOrSupplier;
+ fakeTaskForContext = (BasicTask<T>)callableOrSupplier;
if (fakeTaskForContext.isQueuedOrSubmitted()) {
if (fakeTaskForContext.isDone()) {
- return Maybe.of((T)fakeTaskForContext.getUnchecked());
+ return Maybe.of(fakeTaskForContext.getUnchecked());
} else {
throw new ImmediateUnsupportedException("Task is in
progress and incomplete: "+fakeTaskForContext);
}
}
callableOrSupplier = fakeTaskForContext.getJob();
+ } else if (callableOrSupplier instanceof TaskAdaptable) {
+ return getImmediately(
((TaskAdaptable<T>)callableOrSupplier).asTask() );
} else {
- fakeTaskForContext = new
BasicTask<Object>(MutableMap.of("displayName", "immediate evaluation"));
+ fakeTaskForContext = new
BasicTask<T>(MutableMap.of("displayName", "Immediate evaluation"));
}
- fakeTaskForContext.tags.addAll(tags);
+ final ImmediateSupplier<T> job = callableOrSupplier instanceof
ImmediateSupplier ? (ImmediateSupplier<T>) callableOrSupplier
+ : InterruptingImmediateSupplier.<T>of(callableOrSupplier);
fakeTaskForContext.tags.add(BrooklynTaskTags.IMMEDIATE_TASK_TAG);
fakeTaskForContext.tags.add(BrooklynTaskTags.TRANSIENT_TASK_TAG);
-
- Task<?> previousTask =
BasicExecutionManager.getPerThreadCurrentTask().get();
- BasicExecutionContext oldExecutionContext =
getCurrentExecutionContext();
- registerPerThreadExecutionContext();
- if (previousTask!=null)
fakeTaskForContext.setSubmittedByTask(previousTask);
- fakeTaskForContext.cancel();
+ ContextSwitchingInfo<T> switchContextWrapper =
getContextSwitchingTask(fakeTaskForContext, Collections.emptyList(), true);
+ if (switchContextWrapper!=null) {
+ return
switchContextWrapper.context.getImmediately(switchContextWrapper.wrapperTask);
+ }
+
try {
-
BasicExecutionManager.getPerThreadCurrentTask().set(fakeTaskForContext);
-
- if (!(callableOrSupplier instanceof ImmediateSupplier)) {
- callableOrSupplier =
InterruptingImmediateSupplier.of(callableOrSupplier);
- }
- boolean wasAlreadyInterrupted = Thread.interrupted();
- try {
- return
((ImmediateSupplier<T>)callableOrSupplier).getImmediately();
- } finally {
- if (wasAlreadyInterrupted) {
- Thread.currentThread().interrupt();
- }
- }
-
- } finally {
-
BasicExecutionManager.getPerThreadCurrentTask().set(previousTask);
- perThreadExecutionContext.set(oldExecutionContext);
+ return runInSameThread(fakeTaskForContext, new
Callable<Maybe<T>>() {
--- End diff --
I don't understand how this works (or more accurately what it's supposed to
do).
My expectation for what `getImmediately(task)` should do is: try to
evaluate the task immediately, but if it couldn't then leave the task in a good
state so if someone else does `task.get()` then it will still be usable.
However that is not what happens if you call it on an unsubmitted task.
In the simple test below:
```
@Test
public void testGetImmediatelyForUnsubmittedTask() throws Exception {
ExecutionContext executionContext =
((EntityInternal)e).getExecutionContext();
Task<String> task = new BasicTask<String>(new Callable<String>() {
@Override public String call() throws Exception {
Time.sleep(Duration.ONE_SECOND);
return "done";
}});
Maybe<Object> result = executionContext.getImmediately(task);
System.out.println("result="+result);
System.out.println("task="+task+"; begun="+task.isBegun()+";
done="+task.isDone());
System.out.println("context.get(task)="+executionContext.get(task));
System.out.println("task.get="+task.get());
System.out.println("context.submit(task).get()="+executionContext.submit(task).get());
}
```
The `context.get(task)` throws a `CancellationException`. If you comment
that out and try `task.get()` then the same thing happens. If you comment both
of those out and try the submit, again it fails with the
`CancellationException`.
---