Github user aledsage commented on a diff in the pull request: https://github.com/apache/brooklyn-server/pull/816#discussion_r141573429 --- Diff: core/src/main/java/org/apache/brooklyn/util/core/task/BasicExecutionContext.java --- @@ -100,6 +123,121 @@ public ExecutionManager getExecutionManager() { @Override public Set<Task<?>> getTasks() { return executionManager.getTasksWithAllTags(tags); } + @Override + public <T> T get(TaskAdaptable<T> task) { + final TaskInternal<T> t = (TaskInternal<T>) task.asTask(); + + if (t.isQueuedOrSubmitted()) { + if (t.isDone()) { + return t.getUnchecked(); + } else { + throw new ImmediateUnsupportedException("Task is in progress and incomplete: "+t); + } + } + + ContextSwitchingInfo<T> switchContextWrapper = getContextSwitchingTask(t, Collections.emptyList(), false); + if (switchContextWrapper!=null) { + return switchContextWrapper.context.get(switchContextWrapper.wrapperTask); + } + + try { + return runInSameThread(t, new Callable<Maybe<T>>() { + public Maybe<T> call() throws Exception { + return Maybe.of(t.getJob().call()); + } + }).get(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + + private static class SimpleFuture<T> implements Future<T> { + boolean cancelled = false; + boolean done = false; + Maybe<T> result; + + public synchronized Maybe<T> set(Maybe<T> result) { + this.result = result; + done = true; + notifyAll(); + return result; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + cancelled = true; + return true; + } + + @Override + public boolean isCancelled() { + return cancelled; + } + + @Override + public boolean isDone() { + return done || cancelled; + } + + @Override + public T get() throws InterruptedException, ExecutionException { + if (!isDone()) { + synchronized (this) { + while (!isDone()) { + wait(1000); + } + } + } + if (isCancelled() && !done) { + throw new CancellationException(); + } + return result.get(); + } + + @Override + public synchronized T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + if (isDone()) return get(); + CountdownTimer time = CountdownTimer.newInstanceStarted( Duration.of(timeout, unit) ); + while (!time.isExpired()) { + wait(time.getDurationRemaining().lowerBound(Duration.ONE_MILLISECOND).toMilliseconds()); + if (isDone()) return get(); + } + throw new TimeoutException(); + } + } + + private <T> Maybe<T> runInSameThread(final Task<T> task, Callable<Maybe<T>> job) throws Exception { --- End diff -- This signature is kind-of confusing. Looking at how it's used, I see why you pass in a job (so that the job can call either `getImmediately()` or `task.getJob().call()`. Probably worth javadoc to explain what this does. I think it will ignore the task's job, and instead execute the given Callable `job` that is passed in (using the task for its context, in terms of tags etc). It sets the task's result to be the job's result (setting a future, so the task will be seen as in-progress while the job is executed).
---