http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java new file mode 100644 index 0000000..c776e4d --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/BasicTask.java @@ -0,0 +1,892 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static brooklyn.util.JavaGroovyEquivalents.asString; +import static brooklyn.util.JavaGroovyEquivalents.elvisString; +import groovy.lang.Closure; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.lang.management.LockInfo; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.util.Collections; +import java.util.ConcurrentModificationException; +import java.util.LinkedHashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.brooklyn.api.management.HasTaskChildren; +import org.apache.brooklyn.api.management.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.util.GroovyJavaMethods; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.guava.Maybe; +import brooklyn.util.text.Identifiers; +import brooklyn.util.text.Strings; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.base.Objects; +import com.google.common.base.Throwables; +import com.google.common.collect.Iterables; +import com.google.common.collect.Sets; +import com.google.common.util.concurrent.Callables; +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * The basic concrete implementation of a {@link Task} to be executed. + * + * A {@link Task} is a wrapper for an executable unit, such as a {@link Closure} or a {@link Runnable} or + * {@link Callable} and will run in its own {@link Thread}. + * <p> + * The task can be given an optional displayName and description in its constructor (as named + * arguments in the first {@link Map} parameter). It is guaranteed to have {@link Object#notify()} called + * once whenever the task starts running and once again when the task is about to complete. Due to + * the way executors work it is ugly to guarantee notification <em>after</em> completion, so instead we + * notify just before then expect the user to call {@link #get()} - which will throw errors if the underlying job + * did so - or {@link #blockUntilEnded()} which will not throw errors. + */ +public class BasicTask<T> implements TaskInternal<T> { + private static final Logger log = LoggerFactory.getLogger(BasicTask.class); + + private String id = Identifiers.makeRandomId(8); + protected Callable<T> job; + public final String displayName; + public final String description; + + protected final Set<Object> tags = Sets.newConcurrentHashSet(); + // for debugging, to record where tasks were created +// { tags.add(new Throwable("Creation stack trace")); } + + protected Task<?> proxyTargetTask = null; + + protected String blockingDetails = null; + protected Task<?> blockingTask = null; + Object extraStatusText = null; + + /** listeners attached at task level; these are stored here, but run on the underlying ListenableFuture */ + protected final ExecutionList listeners = new ExecutionList(); + + /** + * Constructor needed to prevent confusion in groovy stubs when looking for default constructor, + * + * The generics on {@link Closure} break it if that is first constructor. + */ + protected BasicTask() { this(Collections.emptyMap()); } + protected BasicTask(Map<?,?> flags) { this(flags, (Callable<T>) null); } + + public BasicTask(Callable<T> job) { this(Collections.emptyMap(), job); } + + public BasicTask(Map<?,?> flags, Callable<T> job) { + this.job = job; + + if (flags.containsKey("tag")) tags.add(flags.remove("tag")); + Object ftags = flags.remove("tags"); + if (ftags!=null) { + if (ftags instanceof Iterable) Iterables.addAll(tags, (Iterable<?>)ftags); + else { + log.info("deprecated use of non-collection argument for 'tags' ("+ftags+") in "+this, new Throwable("trace of discouraged use of non-colleciton tags argument")); + tags.add(ftags); + } + } + + description = elvisString(flags.remove("description"), ""); + String d = asString(flags.remove("displayName")); + displayName = (d==null ? "" : d); + } + + public BasicTask(Runnable job) { this(GroovyJavaMethods.<T>callableFromRunnable(job)); } + public BasicTask(Map<?,?> flags, Runnable job) { this(flags, GroovyJavaMethods.<T>callableFromRunnable(job)); } + public BasicTask(Closure<T> job) { this(GroovyJavaMethods.callableFromClosure(job)); } + public BasicTask(Map<?,?> flags, Closure<T> job) { this(flags, GroovyJavaMethods.callableFromClosure(job)); } + + @Override + public String getId() { + return id; + } + + @Override + public int hashCode() { + return Objects.hashCode(id); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof Task) + return ((Task<?>)obj).getId().equals(getId()); + return false; + } + + @Override + public String toString() { + // give display name plus id, or job and tags plus id; some jobs have been extended to include nice tostrings + return "Task["+ + (Strings.isNonEmpty(displayName) ? + displayName : + (job + (tags!=null && !tags.isEmpty() ? ";"+tags : "")) ) + + ":"+getId()+"]"; + } + + @Override + public Task<T> asTask() { + return this; + } + + // housekeeping -------------------- + + /* + * These flags are set by BasicExecutionManager.submit. + * + * Order is guaranteed to be as shown below, in order of #. Within each # line it is currently in the order specified by commas but this is not guaranteed. + * (The spaces between the # section indicate longer delays / logical separation ... it should be clear!) + * + * # submitter, submit time set, tags and other submit-time fields set + * + * # thread set, ThreadLocal getCurrentTask set + * # start time set, isBegun is true + * # task end callback run, if supplied + * + * # task runs + * + * # task end callback run, if supplied + * # end time set + * # thread cleared, ThreadLocal getCurrentTask set + * # Task.notifyAll() + * # Task.get() (result.get()) available, Task.isDone is true + * + * Few _consumers_ should care, but internally we rely on this so that, for example, status is displayed correctly. + * Tests should catch most things, but be careful if you change any of the above semantics. + */ + + protected long queuedTimeUtc = -1; + protected long submitTimeUtc = -1; + protected long startTimeUtc = -1; + protected long endTimeUtc = -1; + protected Maybe<Task<?>> submittedByTask; + + protected volatile Thread thread = null; + private volatile boolean cancelled = false; + /** normally a {@link ListenableFuture}, except for scheduled tasks when it may be a {@link ScheduledFuture} */ + protected volatile Future<T> internalFuture = null; + + @Override + public synchronized void initInternalFuture(ListenableFuture<T> result) { + if (this.internalFuture != null) + throw new IllegalStateException("task "+this+" is being given a result twice"); + this.internalFuture = result; + notifyAll(); + } + + // metadata accessors ------------ + + @Override + public Set<Object> getTags() { return Collections.unmodifiableSet(new LinkedHashSet<Object>(tags)); } + + /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here; + * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */ + @Override + public long getQueuedTimeUtc() { return queuedTimeUtc; } + + @Override + public long getSubmitTimeUtc() { return submitTimeUtc; } + + @Override + public long getStartTimeUtc() { return startTimeUtc; } + + @Override + public long getEndTimeUtc() { return endTimeUtc; } + + @Override + public Future<T> getInternalFuture() { return internalFuture; } + + @Override + public Task<?> getSubmittedByTask() { + if (submittedByTask==null) return null; + return submittedByTask.orNull(); + } + + /** the thread where the task is running, if it is running */ + @Override + public Thread getThread() { return thread; } + + // basic fields -------------------- + + @Override + public boolean isQueued() { + return (queuedTimeUtc >= 0); + } + + @Override + public boolean isQueuedOrSubmitted() { + return isQueued() || isSubmitted(); + } + + @Override + public boolean isQueuedAndNotSubmitted() { + return isQueued() && (!isSubmitted()); + } + + @Override + public boolean isSubmitted() { + return submitTimeUtc >= 0; + } + + @Override + public boolean isBegun() { + return startTimeUtc >= 0; + } + + /** marks the task as queued for execution */ + @Override + public void markQueued() { + if (queuedTimeUtc<0) + queuedTimeUtc = System.currentTimeMillis(); + } + + @Override + public final synchronized boolean cancel() { return cancel(true); } + + /** doesn't resume it, just means if something was cancelled but not submitted it could now be submitted; + * probably going to be removed and perhaps some mechanism for running again made available + * @since 0.7.0 */ + @Beta + public synchronized boolean uncancel() { + boolean wasCancelled = cancelled; + cancelled = false; + return wasCancelled; + } + + @Override + public synchronized boolean cancel(boolean mayInterruptIfRunning) { + if (isDone()) return false; + boolean cancel = true; + cancelled = true; + if (internalFuture!=null) { + cancel = internalFuture.cancel(mayInterruptIfRunning); + } + notifyAll(); + return cancel; + } + + @Override + public boolean isCancelled() { + return cancelled || (internalFuture!=null && internalFuture.isCancelled()); + } + + @Override + public boolean isDone() { + // if endTime is set, result might not be completed yet, but it will be set very soon + // (the two values are set close in time, result right after the endTime; + // but callback hooks might not see the result yet) + return cancelled || (internalFuture!=null && internalFuture.isDone()) || endTimeUtc>0; + } + + /** + * Returns true if the task has had an error. + * + * Only true if calling {@link #get()} will throw an exception when it completes (including cancel). + * Implementations may set this true before completion if they have that insight, or + * (the default) they may compute it lazily after completion (returning false before completion). + */ + @Override + public boolean isError() { + if (!isDone()) return false; + if (isCancelled()) return true; + try { + get(); + return false; + } catch (Throwable t) { + return true; + } + } + + // future value -------------------- + + @Override + public T get() throws InterruptedException, ExecutionException { + try { + if (!isDone()) + Tasks.setBlockingTask(this); + blockUntilStarted(); + return internalFuture.get(); + } finally { + Tasks.resetBlockingTask(); + } + } + + @Override + public T getUnchecked() { + try { + return get(); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + + @Override + public synchronized void blockUntilStarted() { + blockUntilStarted(null); + } + + @Override + public synchronized boolean blockUntilStarted(Duration timeout) { + Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp(); + while (true) { + if (cancelled) throw new CancellationException(); + if (internalFuture==null) + try { + if (timeout==null) { + wait(); + } else { + long remaining = endTime - System.currentTimeMillis(); + if (remaining>0) + wait(remaining); + else + return false; + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + Throwables.propagate(e); + } + if (internalFuture!=null) return true; + } + } + + @Override + public void blockUntilEnded() { + blockUntilEnded(null); + } + + @Override + public boolean blockUntilEnded(Duration timeout) { + Long endTime = timeout==null ? null : System.currentTimeMillis() + timeout.toMillisecondsRoundingUp(); + try { + boolean started = blockUntilStarted(timeout); + if (!started) return false; + if (timeout==null) { + internalFuture.get(); + } else { + long remaining = endTime - System.currentTimeMillis(); + if (remaining>0) + internalFuture.get(remaining, TimeUnit.MILLISECONDS); + } + return isDone(); + } catch (Throwable t) { + Exceptions.propagateIfFatal(t); + if (!(t instanceof TimeoutException) && log.isDebugEnabled()) + log.debug("call from "+Thread.currentThread()+", blocking until '"+this+"' finishes, ended with error: "+t); + return isDone(); + } + } + + @Override + public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { + return get(new Duration(timeout, unit)); + } + + @Override + public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException { + long start = System.currentTimeMillis(); + Long end = duration==null ? null : start + duration.toMillisecondsRoundingUp(); + while (end==null || end > System.currentTimeMillis()) { + if (cancelled) throw new CancellationException(); + if (internalFuture == null) { + synchronized (this) { + long remaining = end - System.currentTimeMillis(); + if (internalFuture==null && remaining>0) + wait(remaining); + } + } + if (internalFuture != null) break; + } + Long remaining = end==null ? null : end - System.currentTimeMillis(); + if (isDone()) { + return internalFuture.get(1, TimeUnit.MILLISECONDS); + } else if (remaining == null) { + return internalFuture.get(); + } else if (remaining > 0) { + return internalFuture.get(remaining, TimeUnit.MILLISECONDS); + } else { + throw new TimeoutException(); + } + } + + @Override + public T getUnchecked(Duration duration) { + try { + return get(duration); + } catch (Exception e) { + throw Exceptions.propagate(e); + } + } + + // ------------------ status --------------------------- + + /** + * Returns a brief status string + * + * Plain-text format. Reported status if there is one, otherwise state which will be one of: + * <ul> + * <li>Not submitted + * <li>Submitted for execution + * <li>Ended by error + * <li>Ended by cancellation + * <li>Ended normally + * <li>Running + * <li>Waiting + * </ul> + */ + @Override + public String getStatusSummary() { + return getStatusString(0); + } + + /** + * Returns detailed status, suitable for a hover + * + * Plain-text format, with new-lines (and sometimes extra info) if multiline enabled. + */ + @Override + public String getStatusDetail(boolean multiline) { + return getStatusString(multiline?2:1); + } + + /** + * This method is useful for callers to see the status of a task. + * + * Also for developers to see best practices for examining status fields etc + * + * @param verbosity 0 = brief, 1 = one-line with some detail, 2 = lots of detail + */ + protected String getStatusString(int verbosity) { +// Thread t = getThread(); + String rv; + if (submitTimeUtc <= 0) rv = "Not submitted"; + else if (!isCancelled() && startTimeUtc <= 0) { + rv = "Submitted for execution"; + if (verbosity>0) { + long elapsed = System.currentTimeMillis() - submitTimeUtc; + rv += " "+Time.makeTimeStringRoundedSince(elapsed)+" ago"; + } + if (verbosity >= 2 && getExtraStatusText()!=null) { + rv += "\n\n"+getExtraStatusText(); + } + } else if (isDone()) { + long elapsed = endTimeUtc - submitTimeUtc; + String duration = Time.makeTimeStringRounded(elapsed); + if (isCancelled()) { + rv = "Cancelled"; + if (verbosity >= 1) rv+=" after "+duration; + + if (verbosity >= 2 && getExtraStatusText()!=null) { + rv += "\n\n"+getExtraStatusText(); + } + } else if (isError()) { + rv = "Failed"; + if (verbosity >= 1) { + rv += " after "+duration; + Throwable error = Tasks.getError(this); + + if (verbosity >= 2 && getExtraStatusText()!=null) { + rv += "\n\n"+getExtraStatusText(); + } + + //remove outer ExecException which is reported by the get(), we want the exception the task threw + while (error instanceof ExecutionException) error = error.getCause(); + String errorMessage = Exceptions.collapseText(error); + + if (verbosity == 1) rv += ": "+abbreviate(errorMessage); + if (verbosity >= 2) { + rv += ": "+errorMessage; + StringWriter sw = new StringWriter(); + ((Throwable)error).printStackTrace(new PrintWriter(sw)); + rv += "\n\n"+sw.getBuffer(); + } + } + } else { + rv = "Completed"; + if (verbosity>=1) { + if (verbosity==1) { + try { + Object v = get(); + rv += ", " +(v==null ? "no return value (null)" : "result: "+abbreviate(v.toString())); + } catch (Exception e) { + rv += ", but error accessing result ["+e+"]"; //shouldn't happen + } + } else { + rv += " after "+duration; + try { + Object v = get(); + rv += "\n\n" + (v==null ? "No return value (null)" : "Result: "+v); + } catch (Exception e) { + rv += " at first\n" + + "Error accessing result ["+e+"]"; //shouldn't happen + } + if (verbosity >= 2 && getExtraStatusText()!=null) { + rv += "\n\n"+getExtraStatusText(); + } + } + } + } + } else { + rv = getActiveTaskStatusString(verbosity); + } + return rv; + } + + private static String abbreviate(String s) { + s = Strings.getFirstLine(s); + if (s.length()>255) s = s.substring(0, 252)+ "..."; + return s; + } + + protected String getActiveTaskStatusString(int verbosity) { + String rv = ""; + Thread t = getThread(); + + // Normally, it's not possible for thread==null as we were started and not ended + + // However, there is a race where the task starts sand completes between the calls to getThread() + // at the start of the method and this call to getThread(), so both return null even though + // the intermediate checks returned started==true isDone()==false. + if (t == null) { + if (isDone()) { + return getStatusString(verbosity); + } else { + //should only happen for repeating task which is not active + return "Sleeping"; + } + } + + ThreadInfo ti = ManagementFactory.getThreadMXBean().getThreadInfo(t.getId(), (verbosity<=0 ? 0 : verbosity==1 ? 1 : Integer.MAX_VALUE)); + if (getThread()==null) + //thread might have moved on to a new task; if so, recompute (it should now say "done") + return getStatusString(verbosity); + + if (verbosity >= 1 && Strings.isNonBlank(blockingDetails)) { + if (verbosity==1) + // short status string will just show blocking details + return blockingDetails; + //otherwise show the blocking details, then a new line, then additional information + rv = blockingDetails + "\n\n"; + } + + if (verbosity >= 1 && blockingTask!=null) { + if (verbosity==1) + // short status string will just show blocking details + return "Waiting on "+blockingTask; + //otherwise show the blocking details, then a new line, then additional information + rv = "Waiting on "+blockingTask + "\n\n"; + } + + if (verbosity>=2) { + if (getExtraStatusText()!=null) { + rv += getExtraStatusText()+"\n\n"; + } + + rv += ""+toString()+"\n"; + if (submittedByTask!=null) { + rv += "Submitted by "+submittedByTask+"\n"; + } + + if (this instanceof HasTaskChildren) { + // list children tasks for compound tasks + try { + Iterable<Task<?>> childrenTasks = ((HasTaskChildren)this).getChildren(); + if (childrenTasks.iterator().hasNext()) { + rv += "Children:\n"; + for (Task<?> child: childrenTasks) { + rv += " "+child+": "+child.getStatusDetail(false)+"\n"; + } + } + } catch (ConcurrentModificationException exc) { + rv += " (children not available - currently being modified)\n"; + } + } + rv += "\n"; + } + + LockInfo lock = ti.getLockInfo(); + rv += "In progress"; + if (verbosity>=1) { + if (lock==null && ti.getThreadState()==Thread.State.RUNNABLE) { + //not blocked + if (ti.isSuspended()) { + // when does this happen? + rv += ", thread suspended"; + } else { + if (verbosity >= 2) rv += " ("+ti.getThreadState()+")"; + } + } else { + rv +=", thread waiting "; + if (ti.getThreadState() == Thread.State.BLOCKED) { + rv += "(mutex) on "+lookup(lock); + //TODO could say who holds it + } else if (ti.getThreadState() == Thread.State.WAITING) { + rv += "(notify) on "+lookup(lock); + } else if (ti.getThreadState() == Thread.State.TIMED_WAITING) { + rv += "(timed) on "+lookup(lock); + } else { + rv = "("+ti.getThreadState()+") on "+lookup(lock); + } + } + } + if (verbosity>=2) { + StackTraceElement[] st = ti.getStackTrace(); + st = brooklyn.util.javalang.StackTraceSimplifier.cleanStackTrace(st); + if (st!=null && st.length>0) + rv += "\n" +"At: "+st[0]; + for (int ii=1; ii<st.length; ii++) { + rv += "\n" +" "+st[ii]; + } + } + return rv; + } + + protected String lookup(LockInfo info) { + return info!=null ? ""+info : "unknown (sleep)"; + } + + @Override + public String getDisplayName() { + return displayName; + } + + @Override + public String getDescription() { + return description; + } + + + /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait, + * and typically cleared immediately afterwards; referenced by management api to inspect a task + * which is blocking + */ + @Override + public String setBlockingDetails(String blockingDetails) { + String old = this.blockingDetails; + this.blockingDetails = blockingDetails; + return old; + } + + @Override + public Task<?> setBlockingTask(Task<?> blockingTask) { + Task<?> old = this.blockingTask; + this.blockingTask = blockingTask; + return old; + } + + @Override + public void resetBlockingDetails() { + this.blockingDetails = null; + } + + @Override + public void resetBlockingTask() { + this.blockingTask = null; + } + + /** returns a textual message giving details while the task is blocked */ + @Override + public String getBlockingDetails() { + return blockingDetails; + } + + /** returns a task that this task is blocked on */ + @Override + public Task<?> getBlockingTask() { + return blockingTask; + } + + @Override + public void setExtraStatusText(Object extraStatus) { + this.extraStatusText = extraStatus; + } + + @Override + public Object getExtraStatusText() { + return extraStatusText; + } + + // ---- add a way to warn if task is not run + + public interface TaskFinalizer { + public void onTaskFinalization(Task<?> t); + } + + public static final TaskFinalizer WARN_IF_NOT_RUN = new TaskFinalizer() { + @Override + public void onTaskFinalization(Task<?> t) { + if (!Tasks.isAncestorCancelled(t) && !t.isSubmitted()) { + log.warn(t+" was never submitted; did the code create it and forget to run it? ('cancel' the task to suppress this message)"); + log.debug("Detail of unsubmitted task "+t+":\n"+t.getStatusDetail(true)); + return; + } + if (!t.isDone()) { + // shouldn't happen + // TODO But does happen if management context was terminated (e.g. running test suite). + // Should check if Execution Manager is running, and only log if it was not terminated? + log.warn("Task "+t+" is being finalized before completion"); + return; + } + } + }; + + public static final TaskFinalizer NO_OP = new TaskFinalizer() { + @Override + public void onTaskFinalization(Task<?> t) { + } + }; + + public void ignoreIfNotRun() { + setFinalizer(NO_OP); + } + + public void setFinalizer(TaskFinalizer f) { + TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false); + if (finalizer!=null && finalizer!=f) + throw new IllegalStateException("Cannot apply multiple finalizers"); + if (isDone()) + throw new IllegalStateException("Finalizer cannot be set on task "+this+" after it is finished"); + tags.add(f); + } + + @Override + protected void finalize() throws Throwable { + TaskFinalizer finalizer = Tasks.tag(this, TaskFinalizer.class, false); + if (finalizer==null) finalizer = WARN_IF_NOT_RUN; + finalizer.onTaskFinalization(this); + } + + public static class SubmissionErrorCatchingExecutor implements Executor { + final Executor target; + public SubmissionErrorCatchingExecutor(Executor target) { + this.target = target; + } + @Override + public void execute(Runnable command) { + if (isShutdown()) { + log.debug("Skipping execution of task callback hook "+command+" because executor is shutdown."); + return; + } + try { + target.execute(command); + } catch (Exception e) { + if (isShutdown()) { + log.debug("Ignoring failed execution of task callback hook "+command+" because executor is shutdown."); + } else { + log.warn("Execution of task callback hook "+command+" failed: "+e, e); + } + } + } + protected boolean isShutdown() { + return target instanceof ExecutorService && ((ExecutorService)target).isShutdown(); + } + } + + @Override + public void addListener(Runnable listener, Executor executor) { + listeners.add(listener, new SubmissionErrorCatchingExecutor(executor)); + } + + @Override + public void runListeners() { + listeners.execute(); + } + + @Override + public void setEndTimeUtc(long val) { + endTimeUtc = val; + } + + @Override + public void setThread(Thread thread) { + this.thread = thread; + } + + @Override + public Callable<T> getJob() { + return job; + } + + @Override + public void setJob(Callable<T> job) { + this.job = job; + } + + @Override + public ExecutionList getListeners() { + return listeners; + } + + @Override + public void setSubmitTimeUtc(long val) { + submitTimeUtc = val; + } + + private static <T> Task<T> newGoneTaskFor(Task<?> task) { + Task<T> t = Tasks.<T>builder().dynamic(false).name(task.getDisplayName()) + .description("Details of the original task "+task+" have been forgotten.") + .body(Callables.returning((T)null)).build(); + ((BasicTask<T>)t).ignoreIfNotRun(); + return t; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Override + public void setSubmittedByTask(Task<?> task) { + submittedByTask = (Maybe)Maybe.softThen((Task)task, (Maybe)Maybe.of(BasicTask.newGoneTaskFor(task))); + } + + @Override + public Set<Object> getMutableTags() { + return tags; + } + + @Override + public void setStartTimeUtc(long val) { + startTimeUtc = val; + } + + @Override + public void applyTagModifier(Function<Set<Object>,Void> modifier) { + modifier.apply(tags); + } + + @Override + public Task<?> getProxyTarget() { + return proxyTargetTask; + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java b/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java new file mode 100644 index 0000000..407a93a --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/CanSetName.java @@ -0,0 +1,25 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +public interface CanSetName { + + void setName(String name); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java new file mode 100644 index 0000000..8fdb146 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/CompoundTask.java @@ -0,0 +1,131 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import groovy.lang.Closure; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; + +import org.apache.brooklyn.api.management.HasTaskChildren; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.core.management.internal.ManagementContextInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.util.collections.MutableMap; + + +/** + * A {@link Task} that is comprised of other units of work: possibly a heterogeneous mix of {@link Task}, + * {@link Runnable}, {@link Callable} and {@link Closure} instances. + * + * This class holds the collection of child tasks, but subclasses have the responsibility of executing them in a + * sensible manner by implementing the abstract {@link #runJobs} method. + */ +public abstract class CompoundTask<T> extends BasicTask<List<T>> implements HasTaskChildren { + + @SuppressWarnings("unused") + private static final Logger log = LoggerFactory.getLogger(CompoundTask.class); + + protected final List<Task<? extends T>> children; + protected final List<Object> result; + + /** + * Constructs a new compound task containing the specified units of work. + * + * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. + * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types + */ + public CompoundTask(Object... jobs) { + this( Arrays.asList(jobs) ); + } + + /** + * Constructs a new compound task containing the specified units of work. + * + * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. + * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types + */ + public CompoundTask(Collection<?> jobs) { + this(MutableMap.of("tag", "compound"), jobs); + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public CompoundTask(Map<String,?> flags, Collection<?> jobs) { + super(flags); + super.job = new Callable<List<T>>() { + @Override public List<T> call() throws Exception { + return runJobs(); + } + }; + + this.result = new ArrayList<Object>(jobs.size()); + this.children = new ArrayList<Task<? extends T>>(jobs.size()); + for (Object job : jobs) { + Task subtask; + if (job instanceof TaskAdaptable) { subtask = ((TaskAdaptable)job).asTask(); } + else if (job instanceof Closure) { subtask = new BasicTask<T>((Closure) job); } + else if (job instanceof Callable) { subtask = new BasicTask<T>((Callable) job); } + else if (job instanceof Runnable) { subtask = new BasicTask<T>((Runnable) job); } + + else throw new IllegalArgumentException("Invalid child "+(job == null ? null : job.getClass() + " ("+job+")")+ + " passed to compound task; must be Runnable, Callable, Closure or Task"); + + BrooklynTaskTags.addTagDynamically(subtask, ManagementContextInternal.SUB_TASK_TAG); + children.add(subtask); + } + + for (Task<?> t: getChildren()) { + ((TaskInternal<?>)t).markQueued(); + } + } + + /** return value needs to be specified by subclass; subclass should also setBlockingDetails + * @throws ExecutionException + * @throws InterruptedException */ + protected abstract List<T> runJobs() throws InterruptedException, ExecutionException; + + protected void submitIfNecessary(TaskAdaptable<?> task) { + if (!task.asTask().isSubmitted()) { + if (BasicExecutionContext.getCurrentExecutionContext() == null) { + throw new IllegalStateException("Compound task ("+task+") launched from "+this+" missing required execution context"); + } else { + BasicExecutionContext.getCurrentExecutionContext().submit(task); + } + } + } + + public List<Task<? extends T>> getChildrenTyped() { + return children; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public List<Task<?>> getChildren() { + return (List) getChildrenTyped(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java new file mode 100644 index 0000000..ad9416b --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DeferredSupplier.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import com.google.common.base.Supplier; + +/** + * A class that supplies objects of a single type. When used as a ConfigKey value, + * the evaluation is deferred until getConfig() is called. The returned value will then + * be coerced to the correct type. + * + * Subsequent calls to getConfig will result in further calls to deferredProvider.get(), + * rather than reusing the result. If you want to reuse the result, consider instead + * using a Future. + * + * Note that this functionality replaces the ues of Closure in brooklyn 0.4.0, which + * served the same purpose. + */ +public interface DeferredSupplier<T> extends Supplier<T> { + @Override + T get(); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java new file mode 100644 index 0000000..e197705 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicSequentialTask.java @@ -0,0 +1,480 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import groovy.lang.Closure; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; + +import org.apache.brooklyn.api.management.HasTaskChildren; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskQueueingContext; +import org.apache.brooklyn.core.management.internal.ManagementContextInternal; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.BrooklynTaskTags; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.time.CountdownTimer; +import brooklyn.util.time.Duration; + +import com.google.common.annotations.Beta; +import com.google.common.collect.ImmutableList; + +/** Represents a task whose run() method can create other tasks + * which are run sequentially, but that sequence runs in parallel to this task + * <p> + * There is an optional primary job run with this task, along with multiple secondary children. + * If any secondary task fails (assuming it isn't {@link Tasks#markInessential()} then by default + * subsequent tasks are not submitted and the primary task fails (but no tasks are cancelled or interrupted). + * You can change the behavior of this task with fields in {@link FailureHandlingConfig}, + * or the convenience {@link TaskQueueingContext#swallowChildrenFailures()} + * (and {@link DynamicTasks#swallowChildrenFailures()} if you are inside the task). + * <p> + * This synchronizes on secondary tasks when submitting them, in case they may be manually submitted + * and the submitter wishes to ensure it is only submitted once. + * <p> + * Improvements which would be nice to have: + * <li> unqueued tasks not visible in api; would like that + * <li> uses an extra thread (submitted as background task) to monitor the secondary jobs; would be nice to remove this, + * and rely on {@link BasicExecutionManager} to run the jobs sequentially (combined with fix to item above) + * <li> would be nice to have cancel, resume, and possibly skipQueue available as operations (ideally in the REST API and GUI) + **/ +public class DynamicSequentialTask<T> extends BasicTask<T> implements HasTaskChildren, TaskQueueingContext { + + private static final Logger log = LoggerFactory.getLogger(CompoundTask.class); + + protected final Queue<Task<?>> secondaryJobsAll = new ConcurrentLinkedQueue<Task<?>>(); + protected final Queue<Task<?>> secondaryJobsRemaining = new ConcurrentLinkedQueue<Task<?>>(); + protected final Object jobTransitionLock = new Object(); + protected volatile boolean primaryStarted = false; + protected volatile boolean primaryFinished = false; + protected volatile boolean secondaryQueueAborted = false; + protected Thread primaryThread; + protected DstJob dstJob; + protected FailureHandlingConfig failureHandlingConfig = FailureHandlingConfig.DEFAULT; + + // default values for how to handle the various failures + @Beta + public static class FailureHandlingConfig { + /** secondary queue runs independently of primary task (submitting and blocking on each secondary task in order), + * but can set it up not to submit any more tasks if the primary fails */ + public final boolean abortSecondaryQueueOnPrimaryFailure; + /** as {@link #abortSecondaryQueueOnPrimaryFailure} but controls cancelling of secondary queue*/ + public final boolean cancelSecondariesOnPrimaryFailure; + /** secondary queue can continue submitting+blocking tasks even if a secondary task fails (unusual; + * typically handled by {@link TaskTags#markInessential(Task)} on the secondary tasks, in which case + * the secondary queue is never aborted */ + public final boolean abortSecondaryQueueOnSecondaryFailure; + /** unsubmitted secondary tasks (ie those further in the queue) can be cancelled if a secondary task fails */ + public final boolean cancelSecondariesOnSecondaryFailure; + /** whether to issue cancel against primary task if a secondary task fails */ + public final boolean cancelPrimaryOnSecondaryFailure; + /** whether to fail this task if a secondary task fails */ + public final boolean failParentOnSecondaryFailure; + + @Beta + public FailureHandlingConfig( + boolean abortSecondaryQueueOnPrimaryFailure, boolean cancelSecondariesOnPrimaryFailure, + boolean abortSecondaryQueueOnSecondaryFailure, boolean cancelSecondariesOnSecondaryFailure, + boolean cancelPrimaryOnSecondaryFailure, boolean failParentOnSecondaryFailure) { + this.abortSecondaryQueueOnPrimaryFailure = abortSecondaryQueueOnPrimaryFailure; + this.cancelSecondariesOnPrimaryFailure = cancelSecondariesOnPrimaryFailure; + this.abortSecondaryQueueOnSecondaryFailure = abortSecondaryQueueOnSecondaryFailure; + this.cancelSecondariesOnSecondaryFailure = cancelSecondariesOnSecondaryFailure; + this.cancelPrimaryOnSecondaryFailure = cancelPrimaryOnSecondaryFailure; + this.failParentOnSecondaryFailure = failParentOnSecondaryFailure; + } + + public static final FailureHandlingConfig DEFAULT = new FailureHandlingConfig(false, false, true, false, false, true); + public static final FailureHandlingConfig SWALLOWING_CHILDREN_FAILURES = new FailureHandlingConfig(false, false, false, false, false, false); + } + + public static class QueueAbortedException extends IllegalStateException { + private static final long serialVersionUID = -7569362887826818524L; + + public QueueAbortedException(String msg) { + super(msg); + } + public QueueAbortedException(String msg, Throwable cause) { + super(msg, cause); + } + } + + /** + * Constructs a new compound task containing the specified units of work. + * + * @param jobs A potentially heterogeneous mixture of {@link Runnable}, {@link Callable}, {@link Closure} and {@link Task} can be provided. + * @throws IllegalArgumentException if any of the passed child jobs is not one of the above types + */ + public DynamicSequentialTask() { + this(null); + } + + public DynamicSequentialTask(Callable<T> mainJob) { + this(MutableMap.of("tag", "compound"), mainJob); + } + + public DynamicSequentialTask(Map<?,?> flags, Callable<T> mainJob) { + super(flags); + this.job = dstJob = new DstJob(mainJob); + } + + @Override + public void queue(Task<?> t) { + synchronized (jobTransitionLock) { + if (primaryFinished) + throw new IllegalStateException("Cannot add a task to "+this+" which is already finished (trying to add "+t+")"); + if (secondaryQueueAborted) + throw new QueueAbortedException("Cannot add a task to "+this+" whose queue has been aborted (trying to add "+t+")"); + secondaryJobsAll.add(t); + secondaryJobsRemaining.add(t); + BrooklynTaskTags.addTagsDynamically(t, ManagementContextInternal.SUB_TASK_TAG); + ((TaskInternal<?>)t).markQueued(); + jobTransitionLock.notifyAll(); + } + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + return cancel(mayInterruptIfRunning, mayInterruptIfRunning, true); + } + public boolean cancel(boolean mayInterruptTask, boolean interruptPrimaryThread, boolean alsoCancelChildren) { + if (isDone()) return false; + if (log.isTraceEnabled()) log.trace("cancelling {}", this); + boolean cancel = super.cancel(mayInterruptTask); + if (alsoCancelChildren) { + for (Task<?> t: secondaryJobsAll) + cancel |= t.cancel(mayInterruptTask); + } + synchronized (jobTransitionLock) { + if (primaryThread!=null) { + if (interruptPrimaryThread) { + if (log.isTraceEnabled()) log.trace("cancelling {} - interrupting", this); + primaryThread.interrupt(); + } + cancel = true; + } + } + return cancel; + } + + @Override + public synchronized boolean uncancel() { + secondaryQueueAborted = false; + return super.uncancel(); + } + + @Override + public Iterable<Task<?>> getChildren() { + return Collections.unmodifiableCollection(secondaryJobsAll); + } + + /** submits the indicated task for execution in the current execution context, and returns immediately */ + protected void submitBackgroundInheritingContext(Task<?> task) { + BasicExecutionContext ec = BasicExecutionContext.getCurrentExecutionContext(); + if (log.isTraceEnabled()) { + log.trace("task {} - submitting background task {} ({})", new Object[] { Tasks.current(), task, ec }); + } + if (ec==null) { + String message = Tasks.current()!=null ? + // user forgot ExecContext: + "Task "+this+" submitting background task requires an ExecutionContext (an ExecutionManager is not enough): submitting "+task+" in "+Tasks.current() + : // should not happen: + "Cannot submit tasks inside DST when not in a task : submitting "+task+" in "+this; + log.warn(message+" (rethrowing)"); + throw new IllegalStateException(message); + } + synchronized (task) { + if (task.isSubmitted()) { + if (log.isTraceEnabled()) { + log.trace("DST "+this+" skipping submission of child "+task+" because it is already submitted"); + } + } else { + try { + ec.submit(task); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + // Give some context when the submit fails (happens when the target is already unmanaged) + throw new IllegalStateException("Failure submitting task "+task+" in "+this+": "+e.getMessage(), e); + } + } + } + } + + public void setFailureHandlingConfig(FailureHandlingConfig failureHandlingConfig) { + this.failureHandlingConfig = failureHandlingConfig; + } + @Override + public void swallowChildrenFailures() { + setFailureHandlingConfig(FailureHandlingConfig.SWALLOWING_CHILDREN_FAILURES); + } + + protected class DstJob implements Callable<T> { + protected Callable<T> primaryJob; + /** currently executing (or just completed) secondary task, or null if none; + * with jobTransitionLock notified on change and completion */ + protected volatile Task<?> currentSecondary = null; + protected volatile boolean finishedSecondaries = false; + + public DstJob(Callable<T> mainJob) { + this.primaryJob = mainJob; + } + + @SuppressWarnings("unchecked") + @Override + public T call() throws Exception { + + synchronized (jobTransitionLock) { + primaryStarted = true; + primaryThread = Thread.currentThread(); + for (Task<?> t: secondaryJobsAll) + ((TaskInternal<?>)t).markQueued(); + } + // TODO overkill having a thread/task for this, but it works + // optimisation would either use newTaskEndCallback property on task to submit + // or use some kind of single threaded executor for the queued tasks + Task<List<Object>> secondaryJobMaster = Tasks.<List<Object>>builder().dynamic(false) + .name("DST manager (internal)") + // TODO marking it transient helps it be GC'd sooner, + // but ideally we wouldn't have this, + // or else it would be a child + .tag(BrooklynTaskTags.TRANSIENT_TASK_TAG) + .body(new Callable<List<Object>>() { + + @Override + public List<Object> call() throws Exception { + List<Object> result = new ArrayList<Object>(); + try { + while (!secondaryQueueAborted && (!primaryFinished || !secondaryJobsRemaining.isEmpty())) { + synchronized (jobTransitionLock) { + if (!primaryFinished && secondaryJobsRemaining.isEmpty()) { + currentSecondary = null; + jobTransitionLock.wait(1000); + } + } + @SuppressWarnings("rawtypes") + Task secondaryJob = secondaryJobsRemaining.poll(); + if (secondaryJob != null) { + synchronized (jobTransitionLock) { + currentSecondary = secondaryJob; + submitBackgroundInheritingContext(secondaryJob); + jobTransitionLock.notifyAll(); + } + try { + result.add(secondaryJob.get()); + } catch (Exception e) { + if (TaskTags.isInessential(secondaryJob)) { + result.add(Tasks.getError(secondaryJob)); + if (log.isDebugEnabled()) + log.debug("Secondary job queue for "+DynamicSequentialTask.this+" ignoring error in inessential task "+secondaryJob+": "+e); + } else { + if (failureHandlingConfig.cancelSecondariesOnSecondaryFailure) { + if (log.isDebugEnabled()) + log.debug("Secondary job queue for "+DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in task "+secondaryJob+": "+e); + synchronized (jobTransitionLock) { + for (Task<?> t: secondaryJobsRemaining) + t.cancel(true); + jobTransitionLock.notifyAll(); + } + } + + if (failureHandlingConfig.abortSecondaryQueueOnSecondaryFailure) { + if (log.isDebugEnabled()) + log.debug("Aborting secondary job queue for "+DynamicSequentialTask.this+" due to error in child task "+secondaryJob+" ("+e+", being rethrown)"); + secondaryQueueAborted = true; + throw e; + } + + if (!primaryFinished && failureHandlingConfig.cancelPrimaryOnSecondaryFailure) { + cancel(true, false, false); + } + + result.add(Tasks.getError(secondaryJob)); + if (log.isDebugEnabled()) + log.debug("Secondary job queue for "+DynamicSequentialTask.this+" continuing in presence of error in child task "+secondaryJob+" ("+e+", being remembered)"); + } + } + } + } + } finally { + synchronized (jobTransitionLock) { + currentSecondary = null; + finishedSecondaries = true; + jobTransitionLock.notifyAll(); + } + } + return result; + } + }).build(); + ((BasicTask<?>)secondaryJobMaster).proxyTargetTask = DynamicSequentialTask.this; + + submitBackgroundInheritingContext(secondaryJobMaster); + + T result = null; + Throwable error = null; + Throwable uninterestingSelfError = null; + boolean errorIsFromChild = false; + try { + if (log.isTraceEnabled()) log.trace("calling primary job for {}", this); + if (primaryJob!=null) result = primaryJob.call(); + } catch (Throwable selfException) { + Exceptions.propagateIfFatal(selfException); + if (Exceptions.getFirstThrowableOfType(selfException, QueueAbortedException.class) != null) { + // Error was caused by the task already having failed, and this thread calling queue() to try + // to queue more work. The underlying cause will be much more interesting. + // Without this special catch, we record error = "Cannot add a task to ... whose queue has been aborted", + // which gets propagated instead of the more interesting child exception. + uninterestingSelfError = selfException; + } else { + error = selfException; + errorIsFromChild = false; + } + if (failureHandlingConfig.abortSecondaryQueueOnPrimaryFailure) { + if (log.isDebugEnabled()) + log.debug("Secondary job queue for "+DynamicSequentialTask.this+" aborting with "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException); + secondaryQueueAborted = true; + } + if (failureHandlingConfig.cancelSecondariesOnPrimaryFailure) { + if (log.isDebugEnabled()) + log.debug(DynamicSequentialTask.this+" cancelling "+secondaryJobsRemaining.size()+" remaining, due to error in primary task: "+selfException); + synchronized (jobTransitionLock) { + for (Task<?> t: secondaryJobsRemaining) + t.cancel(true); + // do this early to prevent additions; and note we notify very soon below, so not notify is help off until below + primaryThread = null; + primaryFinished = true; + } + } + } finally { + try { + if (log.isTraceEnabled()) log.trace("cleaning up for {}", this); + synchronized (jobTransitionLock) { + // semaphore might be nicer here (aled notes as it is this is a little hard to read) + primaryThread = null; + primaryFinished = true; + jobTransitionLock.notifyAll(); + } + if (!isCancelled() && !Thread.currentThread().isInterrupted()) { + if (log.isTraceEnabled()) log.trace("waiting for secondaries for {}", this); + // wait on tasks sequentially so that blocking information is more interesting + DynamicTasks.waitForLast(); + List<Object> result2 = secondaryJobMaster.get(); + try { + if (primaryJob==null) result = (T)result2; + } catch (ClassCastException e) { /* ignore class cast exception; leave the result as null */ } + } + } catch (Throwable childException) { + Exceptions.propagateIfFatal(childException); + if (error==null) { + error = childException; + errorIsFromChild = true; + } else { + if (log.isDebugEnabled()) log.debug("Parent task "+this+" ignoring child error ("+childException+") in presence of our own error ("+error+")"); + } + } + } + if (error!=null) { + handleException(error, errorIsFromChild); + } + if (uninterestingSelfError != null) { + handleException(uninterestingSelfError, false); + } + return result; + } + + @Override + public String toString() { + return "DstJob:"+DynamicSequentialTask.this.getId(); + } + + /** waits for this job to complete, or the given time to elapse */ + public void join(boolean includePrimary, Duration optionalTimeout) throws InterruptedException { + CountdownTimer timeLeft = optionalTimeout!=null ? CountdownTimer.newInstanceStarted(optionalTimeout) : null; + while (true) { + Task<?> cs; + Duration remaining; + synchronized (jobTransitionLock) { + cs = currentSecondary; + if (finishedSecondaries) return; + remaining = timeLeft==null ? Duration.ONE_SECOND : timeLeft.getDurationRemaining(); + if (!remaining.isPositive()) return; + if (cs==null) { + if (!includePrimary && secondaryJobsRemaining.isEmpty()) return; + // parent still running, no children though + Tasks.setBlockingTask(DynamicSequentialTask.this); + jobTransitionLock.wait(remaining.toMilliseconds()); + Tasks.resetBlockingDetails(); + } + } + if (cs!=null) { + Tasks.setBlockingTask(cs); + cs.blockUntilEnded(remaining); + Tasks.resetBlockingDetails(); + } + } + } + } + + @Override + public List<Task<?>> getQueue() { + return ImmutableList.copyOf(secondaryJobsAll); + } + + public void handleException(Throwable throwable, boolean fromChild) throws Exception { + Exceptions.propagateIfFatal(throwable); + if (fromChild && !failureHandlingConfig.failParentOnSecondaryFailure) { + log.debug("Parent task "+this+" swallowing child error: "+throwable); + return; + } + handleException(throwable); + } + public void handleException(Throwable throwable) throws Exception { + Exceptions.propagateIfFatal(throwable); + if (throwable instanceof Exception) { + // allow checked exceptions to be passed through + throw (Exception)throwable; + } + throw Exceptions.propagate(throwable); + } + + @Override + public void drain(Duration optionalTimeout, boolean includePrimary, boolean throwFirstError) { + try { + dstJob.join(includePrimary, optionalTimeout); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + if (throwFirstError) { + if (isError()) + getUnchecked(); + for (Task<?> t: getQueue()) + if (t.isError() && !TaskTags.isInessential(t)) + t.getUnchecked(); + } + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java new file mode 100644 index 0000000..ed46558 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/DynamicTasks.java @@ -0,0 +1,337 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.List; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.entity.Entity; +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.api.management.TaskFactory; +import org.apache.brooklyn.api.management.TaskQueueingContext; +import org.apache.brooklyn.api.management.TaskWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.entity.basic.EntityInternal; +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.time.Duration; + +import com.google.common.annotations.Beta; +import com.google.common.base.Preconditions; +import com.google.common.collect.Iterables; + +/** + * Contains static methods which detect and use the current {@link TaskQueueingContext} to execute tasks. + * + * @since 0.6.0 + */ +@Beta +public class DynamicTasks { + + private static final Logger log = LoggerFactory.getLogger(DynamicTasks.class); + private static final ThreadLocal<TaskQueueingContext> taskQueueingContext = new ThreadLocal<TaskQueueingContext>(); + + public static void setTaskQueueingContext(TaskQueueingContext newTaskQC) { + taskQueueingContext.set(newTaskQC); + } + + public static TaskQueueingContext getThreadTaskQueuingContext() { + return taskQueueingContext.get(); + } + + public static TaskQueueingContext getTaskQueuingContext() { + TaskQueueingContext adder = getThreadTaskQueuingContext(); + if (adder!=null) return adder; + Task<?> t = Tasks.current(); + if (t instanceof TaskQueueingContext) return (TaskQueueingContext) t; + return null; + } + + + public static void removeTaskQueueingContext() { + taskQueueingContext.remove(); + } + + public static class TaskQueueingResult<T> implements TaskWrapper<T> { + private final Task<T> task; + private final boolean wasQueued; + private ExecutionContext execContext = null; + + private TaskQueueingResult(TaskAdaptable<T> task, boolean wasQueued) { + this.task = task.asTask(); + this.wasQueued = wasQueued; + } + @Override + public Task<T> asTask() { + return task; + } + @Override + public Task<T> getTask() { + return task; + } + /** returns true if the task was queued */ + public boolean wasQueued() { + return wasQueued; + } + /** returns true if the task either is currently queued or has been submitted */ + public boolean isQueuedOrSubmitted() { + return wasQueued || Tasks.isQueuedOrSubmitted(task); + } + /** specifies an execContext to use if the task has to be explicitly submitted; + * if omitted it will attempt to find one based on the current thread's context */ + public TaskQueueingResult<T> executionContext(ExecutionContext execContext) { + this.execContext = execContext; + return this; + } + /** as {@link #executionContext(ExecutionContext)} but inferring from the entity */ + public TaskQueueingResult<T> executionContext(Entity entity) { + this.execContext = ((EntityInternal)entity).getManagementSupport().getExecutionContext(); + return this; + } + private boolean orSubmitInternal() { + if (!wasQueued()) { + if (isQueuedOrSubmitted()) { + log.warn("Redundant call to execute "+getTask()+"; skipping"); + return false; + } else { + ExecutionContext ec = execContext; + if (ec==null) + ec = BasicExecutionContext.getCurrentExecutionContext(); + if (ec==null) + throw new IllegalStateException("Cannot execute "+getTask()+" without an execution context; ensure caller is in an ExecutionContext"); + ec.submit(getTask()); + return true; + } + } else { + return false; + } + } + /** causes the task to be submitted (asynchronously) if it hasn't already been, + * requiring an entity execution context (will try to find a default if not set) */ + public TaskQueueingResult<T> orSubmitAsync() { + orSubmitInternal(); + return this; + } + /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting async */ + public TaskQueueingResult<T> orSubmitAsync(Entity entity) { + executionContext(entity); + return orSubmitAsync(); + } + /** causes the task to be submitted *synchronously* if it hasn't already been submitted; + * useful in contexts such as libraries where callers may be either on a legacy call path + * (which assumes all commands complete immediately); + * requiring an entity execution context (will try to find a default if not set) */ + public TaskQueueingResult<T> orSubmitAndBlock() { + if (orSubmitInternal()) task.getUnchecked(); + return this; + } + /** convenience for setting {@link #executionContext(ExecutionContext)} then submitting blocking */ + public TaskQueueingResult<T> orSubmitAndBlock(Entity entity) { + executionContext(entity); + return orSubmitAndBlock(); + } + /** blocks for the task to be completed + * <p> + * needed in any context where subsequent commands assume the task has completed. + * not needed in a context where the task is simply being built up and queued. + * <p> + * throws if there are any errors + */ + public T andWaitForSuccess() { + return task.getUnchecked(); + } + public void orCancel() { + if (!wasQueued()) { + task.cancel(false); + } + } + } + + /** + * Tries to add the task to the current addition context if there is one, otherwise does nothing. + * <p/> + * Call {@link TaskQueueingResult#orSubmitAsync() orSubmitAsync()} on the returned + * {@link TaskQueueingResult TaskQueueingResult} to handle execution of tasks in a + * {@link BasicExecutionContext}. + */ + public static <T> TaskQueueingResult<T> queueIfPossible(TaskAdaptable<T> task) { + TaskQueueingContext adder = getTaskQueuingContext(); + boolean result = false; + if (adder!=null) + result = Tasks.tryQueueing(adder, task); + return new TaskQueueingResult<T>(task, result); + } + + /** @see #queueIfPossible(TaskAdaptable) */ + public static <T> TaskQueueingResult<T> queueIfPossible(TaskFactory<? extends TaskAdaptable<T>> task) { + return queueIfPossible(task.newTask()); + } + + /** adds the given task to the nearest task addition context, + * either set as a thread-local, or in the current task, or the submitter of the task, etc + * <p> + * throws if it cannot add */ + public static <T> Task<T> queueInTaskHierarchy(Task<T> task) { + Preconditions.checkNotNull(task, "Task to queue cannot be null"); + Preconditions.checkState(!Tasks.isQueuedOrSubmitted(task), "Task to queue must not yet be submitted: {}", task); + + TaskQueueingContext adder = getTaskQueuingContext(); + if (adder!=null) { + if (Tasks.tryQueueing(adder, task)) { + log.debug("Queued task {} at context {} (no hierarchy)", task, adder); + return task; + } + } + + Task<?> t = Tasks.current(); + Preconditions.checkState(t!=null || adder!=null, "No task addition context available for queueing task "+task); + + while (t!=null) { + if (t instanceof TaskQueueingContext) { + if (Tasks.tryQueueing((TaskQueueingContext)t, task)) { + log.debug("Queued task {} at hierarchical context {}", task, t); + return task; + } + } + t = t.getSubmittedByTask(); + } + + throw new IllegalStateException("No task addition context available in current task hierarchy for adding task "+task); + } + + /** + * Queues the given task. + * <p/> + * This method is only valid within a dynamic task. Use {@link #queueIfPossible(TaskAdaptable)} + * and {@link TaskQueueingResult#orSubmitAsync()} if the calling context is a basic task. + * + * @param task The task to queue + * @throws IllegalStateException if no task queueing context is available + * @return The queued task + */ + public static <V extends TaskAdaptable<?>> V queue(V task) { + try { + Preconditions.checkNotNull(task, "Task to queue cannot be null"); + Preconditions.checkState(!Tasks.isQueued(task), "Task to queue must not yet be queued: %s", task); + TaskQueueingContext adder = getTaskQueuingContext(); + if (adder==null) { + throw new IllegalStateException("Task "+task+" cannot be queued here; no queueing context available"); + } + adder.queue(task.asTask()); + return task; + } catch (Throwable e) { + log.warn("Error queueing "+task+" (rethrowing): "+e); + throw Exceptions.propagate(e); + } + } + + /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ + public static void queue(TaskAdaptable<?> task1, TaskAdaptable<?> task2, TaskAdaptable<?> ...tasks) { + queue(task1); + queue(task2); + for (TaskAdaptable<?> task: tasks) queue(task); + } + + /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ + public static <T extends TaskAdaptable<?>> T queue(TaskFactory<T> taskFactory) { + return queue(taskFactory.newTask()); + } + + /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ + public static void queue(TaskFactory<?> task1, TaskFactory<?> task2, TaskFactory<?> ...tasks) { + queue(task1.newTask()); + queue(task2.newTask()); + for (TaskFactory<?> task: tasks) queue(task.newTask()); + } + + /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ + public static <T> Task<T> queue(String name, Callable<T> job) { + return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build()); + } + + /** @see #queue(org.apache.brooklyn.api.management.TaskAdaptable) */ + public static <T> Task<T> queue(String name, Runnable job) { + return DynamicTasks.queue(Tasks.<T>builder().name(name).body(job).build()); + } + + /** queues the task if needed, i.e. if it is not yet submitted (so it will run), + * or if it is submitted but not queued and we are in a queueing context (so it is available for informational purposes) */ + public static <T extends TaskAdaptable<?>> T queueIfNeeded(T task) { + if (!Tasks.isQueued(task)) { + if (Tasks.isSubmitted(task) && getTaskQueuingContext()==null) { + // already submitted and not in a queueing context, don't try to queue + } else { + // needs submitting, put it in the queue + // (will throw an error if we are not a queueing context) + queue(task); + } + } + return task; + } + + /** submits/queues the given task if needed, and gets the result (unchecked) + * only permitted in a queueing context (ie a DST main job) if the task is not yet submitted */ + // things get really confusing if you try to queueInTaskHierarchy -- easy to cause deadlocks! + public static <T> T get(TaskAdaptable<T> t) { + return queueIfNeeded(t).asTask().getUnchecked(); + } + + /** As {@link #drain(Duration, boolean)} waiting forever and throwing the first error + * (excluding errors in inessential tasks), + * then returning the last task in the queue (which is guaranteed to have finished without error, + * if this method returns without throwing) */ + public static Task<?> waitForLast() { + drain(null, true); + // this call to last is safe, as the above guarantees everything will have run + // (on errors the above will throw so we won't come here) + List<Task<?>> q = DynamicTasks.getTaskQueuingContext().getQueue(); + return q.isEmpty() ? null : Iterables.getLast(q); + } + + /** Calls {@link TaskQueueingContext#drain(Duration, boolean, boolean)} on the current task context */ + public static TaskQueueingContext drain(Duration optionalTimeout, boolean throwFirstError) { + TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext(); + Preconditions.checkNotNull(qc, "Cannot drain when there is no queueing context"); + qc.drain(optionalTimeout, false, throwFirstError); + return qc; + } + + /** as {@link Tasks#swallowChildrenFailures()} but requiring a {@link TaskQueueingContext}. */ + @Beta + public static void swallowChildrenFailures() { + Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here"); + Tasks.swallowChildrenFailures(); + } + + /** same as {@link Tasks#markInessential()} + * (but included here for convenience as it is often used in conjunction with {@link DynamicTasks}) */ + public static void markInessential() { + Tasks.markInessential(); + } + + /** queues the task if possible, otherwise submits it asynchronously; returns the task for callers to + * {@link Task#getUnchecked()} or {@link Task#blockUntilEnded()} */ + public static <T> Task<T> submit(TaskAdaptable<T> task, Entity entity) { + return queueIfPossible(task).orSubmitAsync(entity).asTask(); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java new file mode 100644 index 0000000..5341b21 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionListener.java @@ -0,0 +1,31 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import org.apache.brooklyn.api.management.Task; + +public interface ExecutionListener { + + /** invoked when a task completes: + * {@link Task#getEndTimeUtc()} and {@link Task#isDone()} are guaranteed to be set, + * and {@link Task#get()} should return immediately for most Task implementations + * (care has been taken to avoid potential deadlocks here, waiting for a result!) */ + public void onTaskDone(Task<?> task); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java new file mode 100644 index 0000000..be677e3 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ExecutionUtils.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import groovy.lang.Closure; + +import java.util.concurrent.Callable; + +import com.google.common.base.Function; +import com.google.common.base.Throwables; + +public class ExecutionUtils { + /** + * Attempts to run/call the given object, with the given arguments if possible, preserving the return value if there is one (null otherwise); + * throws exception if the callable is a non-null object which cannot be invoked (not a callable or runnable) + * @deprecated since 0.7.0 ; this super-loose typing should be avoided; if it is needed, let's move it to one of the Groovy compatibility classes + */ + @SuppressWarnings({ "unchecked", "rawtypes" }) + public static Object invoke(Object callable, Object ...args) { + if (callable instanceof Closure) return ((Closure<?>)callable).call(args); + if (callable instanceof Callable) { + try { + return ((Callable<?>)callable).call(); + } catch (Throwable t) { + throw Throwables.propagate(t); + } + } + if (callable instanceof Runnable) { ((Runnable)callable).run(); return null; } + if (callable instanceof Function && args.length == 1) { return ((Function)callable).apply(args[0]); } + if (callable==null) return null; + throw new IllegalArgumentException("Cannot invoke unexpected object "+callable+" of type "+callable.getClass()+", with "+args.length+" args"); + } +}