http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java new file mode 100644 index 0000000..794dea9 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ForwardingTask.java @@ -0,0 +1,325 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; + +import org.apache.brooklyn.api.management.Task; + +import brooklyn.util.time.Duration; + +import com.google.common.base.Function; +import com.google.common.collect.ForwardingObject; +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; + +public abstract class ForwardingTask<T> extends ForwardingObject implements TaskInternal<T> { + + /** Constructor for use by subclasses. */ + protected ForwardingTask() {} + + @Override + protected abstract TaskInternal<T> delegate(); + + @Override + public void addListener(Runnable listener, Executor executor) { + delegate().addListener(listener, executor); + } + + @Override + public boolean cancel(boolean arg0) { + return delegate().cancel(arg0); + } + + @Override + public T get() throws InterruptedException, ExecutionException { + return delegate().get(); + } + + @Override + public T get(long arg0, TimeUnit arg1) throws InterruptedException, ExecutionException, TimeoutException { + return delegate().get(arg0, arg1); + } + + @Override + public boolean isCancelled() { + return delegate().isCancelled(); + } + + @Override + public boolean isDone() { + return delegate().isDone(); + } + + @Override + public Task<T> asTask() { + return delegate().asTask(); + } + + @Override + public String getId() { + return delegate().getId(); + } + + @Override + public Set<Object> getTags() { + return delegate().getTags(); + } + + @Override + public long getSubmitTimeUtc() { + return delegate().getSubmitTimeUtc(); + } + + @Override + public long getStartTimeUtc() { + return delegate().getStartTimeUtc(); + } + + @Override + public long getEndTimeUtc() { + return delegate().getEndTimeUtc(); + } + + @Override + public String getDisplayName() { + return delegate().getDisplayName(); + } + + @Override + public String getDescription() { + return delegate().getDescription(); + } + + @Override + public Task<?> getSubmittedByTask() { + return delegate().getSubmittedByTask(); + } + + @Override + public Thread getThread() { + return delegate().getThread(); + } + + @Override + public boolean isSubmitted() { + return delegate().isSubmitted(); + } + + @Override + public boolean isBegun() { + return delegate().isBegun(); + } + + @Override + public boolean isError() { + return delegate().isError(); + } + + @Override + public void blockUntilStarted() { + delegate().blockUntilStarted(); + } + + @Override + public void blockUntilEnded() { + delegate().blockUntilEnded(); + } + + @Override + public boolean blockUntilEnded(Duration timeout) { + return delegate().blockUntilEnded(timeout); + } + + @Override + public String getStatusSummary() { + return delegate().getStatusSummary(); + } + + @Override + public String getStatusDetail(boolean multiline) { + return delegate().getStatusDetail(multiline); + } + + @Override + public T get(Duration duration) throws InterruptedException, ExecutionException, TimeoutException { + return delegate().get(duration); + } + + @Override + public T getUnchecked() { + return delegate().getUnchecked(); + } + + @Override + public T getUnchecked(Duration duration) { + return delegate().getUnchecked(duration); + } + + @Override + public void initInternalFuture(ListenableFuture<T> result) { + delegate().initInternalFuture(result); + } + + @Override + public long getQueuedTimeUtc() { + return delegate().getQueuedTimeUtc(); + } + + @Override + public Future<T> getInternalFuture() { + return delegate().getInternalFuture(); + } + + @Override + public boolean isQueued() { + return delegate().isQueued(); + } + + @Override + public boolean isQueuedOrSubmitted() { + return delegate().isQueuedOrSubmitted(); + } + + @Override + public boolean isQueuedAndNotSubmitted() { + return delegate().isQueuedAndNotSubmitted(); + } + + @Override + public void markQueued() { + delegate().markQueued(); + } + + @Override + public boolean cancel() { + return delegate().cancel(); + } + + @Override + public boolean blockUntilStarted(Duration timeout) { + return delegate().blockUntilStarted(timeout); + } + + @Override + public String setBlockingDetails(String blockingDetails) { + return delegate().setBlockingDetails(blockingDetails); + } + + @Override + public Task<?> setBlockingTask(Task<?> blockingTask) { + return delegate().setBlockingTask(blockingTask); + } + + @Override + public void resetBlockingDetails() { + delegate().resetBlockingDetails(); + } + + @Override + public void resetBlockingTask() { + delegate().resetBlockingTask(); + } + + @Override + public String getBlockingDetails() { + return delegate().getBlockingDetails(); + } + + @Override + public Task<?> getBlockingTask() { + return delegate().getBlockingTask(); + } + + @Override + public void setExtraStatusText(Object extraStatus) { + delegate().setExtraStatusText(extraStatus); + } + + @Override + public Object getExtraStatusText() { + return delegate().getExtraStatusText(); + } + + @Override + public void runListeners() { + delegate().runListeners(); + } + + @Override + public void setEndTimeUtc(long val) { + delegate().setEndTimeUtc(val); + } + + @Override + public void setThread(Thread thread) { + delegate().setThread(thread); + } + + @Override + public Callable<T> getJob() { + return delegate().getJob(); + } + + @Override + public void setJob(Callable<T> job) { + delegate().setJob(job); + } + + @Override + public ExecutionList getListeners() { + return delegate().getListeners(); + } + + @Override + public void setSubmitTimeUtc(long currentTimeMillis) { + delegate().setSubmitTimeUtc(currentTimeMillis); + } + + @Override + public void setSubmittedByTask(Task<?> task) { + delegate().setSubmittedByTask(task); + } + + @Override + public Set<Object> getMutableTags() { + return delegate().getMutableTags(); + } + + @Override + public void setStartTimeUtc(long currentTimeMillis) { + delegate().setStartTimeUtc(currentTimeMillis); + } + + @Override + public void applyTagModifier(Function<Set<Object>, Void> modifier) { + delegate().applyTagModifier(modifier); + } + + @Override + public Task<?> getProxyTarget() { + return delegate().getProxyTarget(); + } +}
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java new file mode 100644 index 0000000..bfe88b0 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ListenableForwardingFuture.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.concurrent.Executor; +import java.util.concurrent.Future; + +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ForwardingFuture.SimpleForwardingFuture; +import com.google.common.util.concurrent.ListenableFuture; + +/** Wraps a Future, making it a ListenableForwardingFuture, but with the caller having the resposibility to: + * <li> invoke the listeners on job completion (success or error) + * <li> invoke the listeners on cancel */ +public abstract class ListenableForwardingFuture<T> extends SimpleForwardingFuture<T> implements ListenableFuture<T> { + + final ExecutionList listeners; + + protected ListenableForwardingFuture(Future<T> delegate) { + super(delegate); + this.listeners = new ExecutionList(); + } + + protected ListenableForwardingFuture(Future<T> delegate, ExecutionList list) { + super(delegate); + this.listeners = list; + } + + @Override + public void addListener(Runnable listener, Executor executor) { + listeners.add(listener, executor); + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java new file mode 100644 index 0000000..10da414 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ParallelTask.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.brooklyn.api.management.Task; + +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.text.Strings; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.Lists; + +/** + * Runs {@link Task}s in parallel. + * + * No guarantees of order of starting the tasks, but the return value is a + * {@link List} of the return values of supplied tasks in the same + * order they were passed as arguments. + */ +public class ParallelTask<T> extends CompoundTask<T> { + public ParallelTask(Object... tasks) { super(tasks); } + + public ParallelTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); } + public ParallelTask(Collection<? extends Object> tasks) { super(tasks); } + + public ParallelTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); } + public ParallelTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); } + + @Override + protected List<T> runJobs() throws InterruptedException, ExecutionException { + setBlockingDetails("Executing "+ + (children.size()==1 ? "1 child task" : + children.size()+" children tasks in parallel") ); + for (Task<? extends T> task : children) { + submitIfNecessary(task); + } + + List<T> result = Lists.newArrayList(); + List<Exception> exceptions = Lists.newArrayList(); + for (Task<? extends T> task : children) { + T x; + try { + x = task.get(); + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + if (TaskTags.isInessential(task)) { + // ignore exception as it's inessential + } else { + exceptions.add(e); + } + x = null; + } + result.add(x); + } + + if (exceptions.isEmpty()) { + return result; + } else { + if (result.size()==1 && exceptions.size()==1) + throw Exceptions.propagate( exceptions.get(0) ); + throw Exceptions.propagate(exceptions.size()+" of "+result.size()+" parallel child task"+Strings.s(result.size())+" failed", exceptions); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java new file mode 100644 index 0000000..5c4b208 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/ScheduledTask.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import static brooklyn.util.GroovyJavaMethods.elvis; +import static brooklyn.util.GroovyJavaMethods.truth; + +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +import org.apache.brooklyn.api.management.Task; + +import brooklyn.util.collections.MutableMap; +import brooklyn.util.time.Duration; + +import com.google.common.annotations.Beta; +import com.google.common.base.Throwables; + +/** + * A task which runs with a fixed period. + * <p> + * Note that some termination logic, including {@link #addListener(Runnable, java.util.concurrent.Executor)}, + * is not precisely defined. + */ +// TODO ScheduledTask is a very pragmatic implementation; would be nice to tighten, +// reduce external assumptions about internal structure, and clarify "done" semantics +public class ScheduledTask extends BasicTask { + + final Callable<Task<?>> taskFactory; + /** initial delay before running, set as flag in constructor; defaults to 0 */ + protected Duration delay; + /** time to wait between executions, or null if not to repeat (default), set as flag to constructor; + * this may be modified for subsequent submissions by a running task generated by the factory + * using getSubmittedByTask().setPeriod(Duration) */ + protected Duration period = null; + /** optional, set as flag in constructor; defaults to null meaning no limit */ + protected Integer maxIterations = null; + + protected int runCount=0; + protected Task<?> recentRun, nextRun; + + public int getRunCount() { return runCount; } + public ScheduledFuture<?> getNextScheduled() { return (ScheduledFuture<?>)internalFuture; } + + public ScheduledTask(Callable<Task<?>> taskFactory) { + this(MutableMap.of(), taskFactory); + } + + public ScheduledTask(final Task<?> task) { + this(MutableMap.of(), task); + } + + public ScheduledTask(Map flags, final Task<?> task){ + this(flags, new Callable<Task<?>>(){ + @Override + public Task<?> call() throws Exception { + return task; + }}); + } + + public ScheduledTask(Map flags, Callable<Task<?>> taskFactory) { + super(flags); + this.taskFactory = taskFactory; + + delay = Duration.of(elvis(flags.remove("delay"), 0)); + period = Duration.of(elvis(flags.remove("period"), null)); + maxIterations = elvis(flags.remove("maxIterations"), null); + } + + public ScheduledTask delay(Duration d) { + this.delay = d; + return this; + } + public ScheduledTask delay(long val) { + return delay(Duration.millis(val)); + } + + public ScheduledTask period(Duration d) { + this.period = d; + return this; + } + public ScheduledTask period(long val) { + return period(Duration.millis(val)); + } + + public ScheduledTask maxIterations(int val) { + this.maxIterations = val; + return this; + } + + public Callable<Task<?>> getTaskFactory() { + return taskFactory; + } + + public Task<?> newTask() { + try { + return taskFactory.call(); + } catch (Exception e) { + throw Throwables.propagate(e); + } + } + + protected String getActiveTaskStatusString(int verbosity) { + StringBuilder rv = new StringBuilder("Scheduler"); + if (runCount>0) rv.append(", iteration "+(runCount+1)); + if (recentRun!=null) rv.append(", last run "+ + Duration.sinceUtc(recentRun.getStartTimeUtc())+" ms ago"); + if (truth(getNextScheduled())) { + Duration untilNext = Duration.millis(getNextScheduled().getDelay(TimeUnit.MILLISECONDS)); + if (untilNext.isPositive()) + rv.append(", next in "+untilNext); + else + rv.append(", next imminent"); + } + return rv.toString(); + } + + @Override + public boolean isDone() { + return isCancelled() || (maxIterations!=null && maxIterations <= runCount) || (period==null && nextRun!=null && nextRun.isDone()); + } + + public synchronized void blockUntilFirstScheduleStarted() { + // TODO Assumes that maxIterations is not negative! + while (true) { + if (isCancelled()) throw new CancellationException(); + if (recentRun==null) + try { + wait(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + Throwables.propagate(e); + } + if (recentRun!=null) return; + } + } + + public void blockUntilEnded() { + while (!isDone()) super.blockUntilEnded(); + } + + /** gets the value of the most recently run task */ + public Object get() throws InterruptedException, ExecutionException { + blockUntilStarted(); + blockUntilFirstScheduleStarted(); + return (truth(recentRun)) ? recentRun.get() : internalFuture.get(); + } + + @Override + public synchronized boolean cancel(boolean mayInterrupt) { + boolean result = super.cancel(mayInterrupt); + if (nextRun!=null) { + nextRun.cancel(mayInterrupt); + notifyAll(); + } + return result; + } + + /** internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation + * @param duration */ + @Beta + public boolean blockUntilNextRunFinished(Duration timeout) { + return Tasks.blockUntilInternalTasksEnded(nextRun, timeout); + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java b/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java new file mode 100644 index 0000000..93ecdf6 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/SequentialTask.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ExecutionException; + +import org.apache.brooklyn.api.management.Task; + +import com.google.common.collect.ImmutableList; + + +/** runs tasks in order, waiting for one to finish before starting the next; return value here is TBD; + * (currently is all the return values of individual tasks, but we + * might want some pipeline support and eventually only to return final value...) */ +public class SequentialTask<T> extends CompoundTask<T> { + + public SequentialTask(Object... tasks) { super(tasks); } + + public SequentialTask(Map<String,?> flags, Collection<? extends Object> tasks) { super(flags, tasks); } + public SequentialTask(Collection<? extends Object> tasks) { super(tasks); } + + public SequentialTask(Map<String,?> flags, Iterable<? extends Object> tasks) { super(flags, ImmutableList.copyOf(tasks)); } + public SequentialTask(Iterable<? extends Object> tasks) { super(ImmutableList.copyOf(tasks)); } + + protected List<T> runJobs() throws InterruptedException, ExecutionException { + setBlockingDetails("Executing "+ + (children.size()==1 ? "1 child task" : + children.size()+" children tasks sequentially") ); + + List<T> result = new ArrayList<T>(); + for (Task<? extends T> task : children) { + submitIfNecessary(task); + // throw exception (and cancel subsequent tasks) on error + result.add(task.get()); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java b/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java new file mode 100644 index 0000000..2a5b51d --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/SingleThreadedScheduler.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; + +import org.apache.brooklyn.api.management.Task; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Instances of this class ensures that {@link Task}s execute with in-order + * single-threaded semantics. + * + * Tasks can be presented through {@link #submit(Callable)}. The order of execution is the + * sumbission order. + * <p> + * This implementation does so by blocking on a {@link ConcurrentLinkedQueue}, <em>after</em> + * the task is started in a thread (and {@link Task#isBegun()} returns true), but (of course) + * <em>before</em> the {@link TaskInternal#getJob()} actually gets invoked. + */ +public class SingleThreadedScheduler implements TaskScheduler, CanSetName { + private static final Logger LOG = LoggerFactory.getLogger(SingleThreadedScheduler.class); + + private final Queue<QueuedSubmission<?>> order = new ConcurrentLinkedQueue<QueuedSubmission<?>>(); + private int queueSize = 0; + private final AtomicBoolean running = new AtomicBoolean(false); + + private ExecutorService executor; + + private String name; + + @Override + public void setName(String name) { + this.name = name; + } + + @Override + public String toString() { + return name!=null ? "SingleThreadedScheduler["+name+"]" : super.toString(); + } + + @Override + public void injectExecutor(ExecutorService executor) { + this.executor = executor; + } + + @Override + public synchronized <T> Future<T> submit(Callable<T> c) { + if (running.compareAndSet(false, true)) { + return executeNow(c); + } else { + WrappingFuture<T> f = new WrappingFuture<T>(); + order.add(new QueuedSubmission<T>(c, f)); + queueSize++; + if (queueSize>0 && (queueSize == 50 || (queueSize<=500 && (queueSize%100)==0) || (queueSize%1000)==0) && queueSize!=lastSizeWarn) { + LOG.warn("{} is backing up, {} tasks queued", this, queueSize); + if (LOG.isDebugEnabled()) { + LOG.debug("Task queue backing up detail, queue "+this+"; task context is "+Tasks.current()+"; latest task is "+c+"; first task is "+order.peek()); + } + lastSizeWarn = queueSize; + } + return f; + } + } + int lastSizeWarn = 0; + + @SuppressWarnings({ "rawtypes", "unchecked" }) + private synchronized void onEnd() { + boolean done = false; + while (!done) { + if (order.isEmpty()) { + running.set(false); + done = true; + } else { + QueuedSubmission<?> qs = order.remove(); + queueSize--; + if (!qs.f.isCancelled()) { + Future future = executeNow(qs.c); + qs.f.setDelegate(future); + done = true; + } + } + } + } + + private synchronized <T> Future<T> executeNow(final Callable<T> c) { + return executor.submit(new Callable<T>() { + @Override public T call() throws Exception { + try { + return c.call(); + } finally { + onEnd(); + } + }}); + } + + + private static class QueuedSubmission<T> { + final Callable<T> c; + final WrappingFuture<T> f; + + QueuedSubmission(Callable<T> c, WrappingFuture<T> f) { + this.c = c; + this.f = f; + } + + @Override + public String toString() { + return "QueuedSubmission["+c+"]@"+Integer.toHexString(System.identityHashCode(this)); + } + } + + /** + * A future, where the task may not yet have been submitted to the real executor. + * It delegates to the real future if present, and otherwise waits for that to appear + */ + private static class WrappingFuture<T> implements Future<T> { + private volatile Future<T> delegate; + private boolean cancelled; + + void setDelegate(Future<T> delegate) { + synchronized (this) { + this.delegate = delegate; + notifyAll(); + } + } + + @Override public boolean cancel(boolean mayInterruptIfRunning) { + if (delegate != null) { + return delegate.cancel(mayInterruptIfRunning); + } else { + cancelled = true; + synchronized (this) { + notifyAll(); + } + return true; + } + } + + @Override public boolean isCancelled() { + if (delegate != null) { + return delegate.isCancelled(); + } else { + return cancelled; + } + } + + @Override public boolean isDone() { + return (delegate != null) ? delegate.isDone() : cancelled; + } + + @Override public T get() throws CancellationException, ExecutionException, InterruptedException { + if (cancelled) { + throw new CancellationException(); + } else if (delegate != null) { + return delegate.get(); + } else { + synchronized (this) { + while (delegate == null && !cancelled) { + wait(); + } + } + return get(); + } + } + + @Override public T get(long timeout, TimeUnit unit) throws CancellationException, ExecutionException, InterruptedException, TimeoutException { + long endtime = System.currentTimeMillis()+unit.toMillis(timeout); + + if (cancelled) { + throw new CancellationException(); + } else if (delegate != null) { + return delegate.get(timeout, unit); + } else if (System.currentTimeMillis() >= endtime) { + throw new TimeoutException(); + } else { + synchronized (this) { + while (delegate == null && !cancelled && System.currentTimeMillis() < endtime) { + long remaining = endtime - System.currentTimeMillis(); + if (remaining > 0) { + wait(remaining); + } + } + } + long remaining = endtime - System.currentTimeMillis(); + return get(remaining, TimeUnit.MILLISECONDS); + } + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java new file mode 100644 index 0000000..b105a00 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskBuilder.java @@ -0,0 +1,184 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.Callable; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.api.management.TaskFactory; +import org.apache.brooklyn.api.management.TaskQueueingContext; + +import brooklyn.util.JavaGroovyEquivalents; +import brooklyn.util.collections.MutableList; +import brooklyn.util.collections.MutableMap; +import brooklyn.util.collections.MutableSet; + +import com.google.common.collect.Iterables; + +/** Convenience for creating tasks; note that DynamicSequentialTask is the default */ +public class TaskBuilder<T> { + + String name = null; + String description = null; + Callable<T> body = null; + Boolean swallowChildrenFailures = null; + List<TaskAdaptable<?>> children = MutableList.of(); + Set<Object> tags = MutableSet.of(); + Map<String,Object> flags = MutableMap.of(); + Boolean dynamic = null; + boolean parallel = false; + + public static <T> TaskBuilder<T> builder() { + return new TaskBuilder<T>(); + } + + public TaskBuilder<T> name(String name) { + this.name = name; + return this; + } + + public TaskBuilder<T> description(String description) { + this.description = description; + return this; + } + + /** whether task that is built has been explicitly specified to be a dynamic task + * (ie a Task which is also a {@link TaskQueueingContext} + * whereby new tasks can be added after creation */ + public TaskBuilder<T> dynamic(boolean dynamic) { + this.dynamic = dynamic; + return this; + } + + /** whether task that is built should be parallel; cannot (currently) also be dynamic */ + public TaskBuilder<T> parallel(boolean parallel) { + this.parallel = parallel; + return this; + } + + public TaskBuilder<T> body(Callable<T> body) { + this.body = body; + return this; + } + + /** sets up a dynamic task not to fail even if children fail */ + public TaskBuilder<T> swallowChildrenFailures(boolean swallowChildrenFailures) { + this.swallowChildrenFailures = swallowChildrenFailures; + return this; + } + + public TaskBuilder<T> body(Runnable body) { + this.body = JavaGroovyEquivalents.<T>toCallable(body); + return this; + } + + /** adds a child to the given task; the semantics of how the child is executed is set using + * {@link #dynamic(boolean)} and {@link #parallel(boolean)} */ + public TaskBuilder<T> add(TaskAdaptable<?> child) { + children.add(child); + return this; + } + + public TaskBuilder<T> addAll(Iterable<? extends TaskAdaptable<?>> additionalChildren) { + Iterables.addAll(children, additionalChildren); + return this; + } + + public TaskBuilder<T> add(TaskAdaptable<?>... additionalChildren) { + children.addAll(Arrays.asList(additionalChildren)); + return this; + } + + /** adds a tag to the given task */ + public TaskBuilder<T> tag(Object tag) { + tags.add(tag); + return this; + } + + /** adds a flag to the given task */ + public TaskBuilder<T> flag(String flag, Object value) { + flags.put(flag, value); + return this; + } + + /** adds the given flags to the given task */ + public TaskBuilder<T> flags(Map<String,Object> flags) { + this.flags.putAll(flags); + return this; + } + + @SuppressWarnings({ "unchecked", "rawtypes" }) + public Task<T> build() { + MutableMap<String, Object> taskFlags = MutableMap.copyOf(flags); + if (name!=null) taskFlags.put("displayName", name); + if (description!=null) taskFlags.put("description", description); + if (!tags.isEmpty()) taskFlags.put("tags", tags); + + if (Boolean.FALSE.equals(dynamic) && children.isEmpty()) { + if (swallowChildrenFailures!=null) + throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this); + return new BasicTask<T>(taskFlags, body); + } + + // prefer dynamic set unless (a) user has said not dynamic, or (b) it's parallel (since there is no dynamic parallel yet) + // dynamic has better cancel (will interrupt the thread) and callers can submit tasks flexibly; + // however dynamic uses an extra thread and task and is noisy for contexts which don't need it + if (Boolean.TRUE.equals(dynamic) || (dynamic==null && !parallel)) { + if (parallel) + throw new UnsupportedOperationException("No implementation of parallel dynamic aggregate task available"); + DynamicSequentialTask<T> result = new DynamicSequentialTask<T>(taskFlags, body); + if (swallowChildrenFailures!=null && swallowChildrenFailures.booleanValue()) result.swallowChildrenFailures(); + for (TaskAdaptable t: children) + result.queue(t.asTask()); + return result; + } + + // T must be of type List<V> for these to be valid + if (body != null) { + throw new UnsupportedOperationException("No implementation of non-dynamic task with both body and children"); + } + if (swallowChildrenFailures!=null) { + throw new IllegalArgumentException("Cannot set swallowChildrenFailures for non-dynamic task: "+this); + } + + if (parallel) + return new ParallelTask(taskFlags, children); + else + return new SequentialTask(taskFlags, children); + } + + /** returns a a factory based on this builder */ + public TaskFactory<Task<T>> buildFactory() { + return new TaskFactory<Task<T>>() { + public Task<T> newTask() { + return build(); + } + }; + } + + @Override + public String toString() { + return super.toString()+"["+name+"]"; + } +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java new file mode 100644 index 0000000..b4a6569 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskInternal.java @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Set; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.brooklyn.api.management.ExecutionManager; +import org.apache.brooklyn.api.management.Task; + +import brooklyn.util.time.Duration; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.util.concurrent.ExecutionList; +import com.google.common.util.concurrent.ListenableFuture; + +/** + * All tasks being passed to the {@link ExecutionManager} should implement this. + * Users are strongly encouraged to use (or extend) {@link BasicTask}, rather than + * implementing a task from scratch. + * + * The methods on this interface will change in subsequent releases. Because this is + * marked as beta, the normal deprecation policy for these methods does not apply. + * + * @author aled + */ +@Beta +public interface TaskInternal<T> extends Task<T> { + + /** sets the internal future object used to record the association to a job submitted to an {@link ExecutorService} */ + void initInternalFuture(ListenableFuture<T> result); + + /** returns the underlying future where this task's results will come in; see {@link #initInternalFuture(ListenableFuture)} */ + Future<T> getInternalFuture(); + + /** if the job is queued for submission (e.g. by another task) it can indicate that fact (and time) here; + * note tasks can (and often are) submitted without any queueing, in which case this value may be -1 */ + long getQueuedTimeUtc(); + + boolean isQueuedOrSubmitted(); + boolean isQueuedAndNotSubmitted(); + boolean isQueued(); + + /** marks the task as queued for execution */ + void markQueued(); + + boolean cancel(); + + boolean blockUntilStarted(Duration timeout); + + /** allows a task user to specify why a task is blocked; for use immediately before a blocking/wait, + * and typically cleared immediately afterwards; referenced by management api to inspect a task + * which is blocking + * <p> + * returns previous details, in case caller wishes to recall and restore it (e.g. if it is doing a sub-blocking) + */ + String setBlockingDetails(String blockingDetails); + + /** as {@link #setBlockingDetails(String)} but records a task which is blocking, + * for use e.g. in a gui to navigate to the current active subtask + * <p> + * returns previous blocking task, in case caller wishes to recall and restore it + */ + Task<?> setBlockingTask(Task<?> blockingTask); + + void resetBlockingDetails(); + + void resetBlockingTask(); + + /** returns a textual message giving details while the task is blocked */ + String getBlockingDetails(); + + /** returns a task that this task is blocked on */ + Task<?> getBlockingTask(); + + void setExtraStatusText(Object extraStatus); + + Object getExtraStatusText(); + + void runListeners(); + + void setEndTimeUtc(long val); + + void setThread(Thread thread); + + Callable<T> getJob(); + + void setJob(Callable<T> job); + + ExecutionList getListeners(); + + void setSubmitTimeUtc(long currentTimeMillis); + + void setSubmittedByTask(Task<?> task); + + Set<Object> getMutableTags(); + + void setStartTimeUtc(long currentTimeMillis); + + void applyTagModifier(Function<Set<Object>,Void> modifier); + + /** if a task is a proxy for another one (used mainly for internal tasks), + * this returns the "real" task represented by this one */ + Task<?> getProxyTarget(); + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java new file mode 100644 index 0000000..7c5d8a2 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskScheduler.java @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; + +import org.apache.brooklyn.api.management.Task; + +/** + * The scheduler is an internal mechanism to decorate {@link Task}s. + * + * It can control how the tasks are scheduled for execution (e.g. single-threaded execution, + * prioritised, etc). + */ +public interface TaskScheduler { + + public void injectExecutor(ExecutorService executor); + + /** + * Called by {@link BasicExecutionManager} to schedule tasks. + */ + public <T> Future<T> submit(Callable<T> c); +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java new file mode 100644 index 0000000..e404e87 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/TaskTags.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Set; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; + +import com.google.common.base.Function; + +public class TaskTags { + + /** marks a task which is allowed to fail without failing his parent */ + public static final String INESSENTIAL_TASK = "inessential"; + + /** marks a task which is a subtask of another */ + public static final String SUB_TASK_TAG = "SUB-TASK"; + + public static void addTagDynamically(TaskAdaptable<?> task, final Object tag) { + ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() { + public Void apply(@Nullable Set<Object> input) { + input.add(tag); + return null; + } + }); + } + + public static void addTagsDynamically(TaskAdaptable<?> task, final Object tag1, final Object ...tags) { + ((BasicTask<?>)task.asTask()).applyTagModifier(new Function<Set<Object>, Void>() { + public Void apply(@Nullable Set<Object> input) { + input.add(tag1); + for (Object tag: tags) input.add(tag); + return null; + } + }); + } + + + public static boolean isInessential(Task<?> task) { + return hasTag(task, INESSENTIAL_TASK); + } + + public static boolean hasTag(Task<?> task, Object tag) { + return task.getTags().contains(tag); + } + + public static <U,V extends TaskAdaptable<U>> V markInessential(V task) { + addTagDynamically(task, INESSENTIAL_TASK); + return task; + } + +} http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/699b3f65/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java b/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java new file mode 100644 index 0000000..d391a90 --- /dev/null +++ b/core/src/main/java/org/apache/brooklyn/core/util/task/Tasks.java @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.brooklyn.core.util.task; + +import java.util.Collections; +import java.util.List; +import java.util.concurrent.Callable; +import java.util.concurrent.CancellationException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.FutureTask; + +import javax.annotation.Nullable; + +import org.apache.brooklyn.api.management.ExecutionContext; +import org.apache.brooklyn.api.management.HasTaskChildren; +import org.apache.brooklyn.api.management.Task; +import org.apache.brooklyn.api.management.TaskAdaptable; +import org.apache.brooklyn.api.management.TaskFactory; +import org.apache.brooklyn.api.management.TaskQueueingContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.ReferenceWithError; +import brooklyn.util.repeat.Repeater; +import brooklyn.util.time.CountdownTimer; +import brooklyn.util.time.Duration; +import brooklyn.util.time.Time; + +import com.google.common.annotations.Beta; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import com.google.common.base.Predicate; +import com.google.common.base.Supplier; +import com.google.common.collect.Iterables; + +public class Tasks { + + private static final Logger log = LoggerFactory.getLogger(Tasks.class); + + /** convenience for setting "blocking details" on any task where the current thread is running; + * typically invoked prior to a wait, for transparency to a user; + * then invoked with 'null' just after the wait */ + public static String setBlockingDetails(String description) { + Task<?> current = current(); + if (current instanceof TaskInternal) + return ((TaskInternal<?>)current).setBlockingDetails(description); + return null; + } + public static void resetBlockingDetails() { + Task<?> current = current(); + if (current instanceof TaskInternal) + ((TaskInternal<?>)current).resetBlockingDetails(); + } + public static Task<?> setBlockingTask(Task<?> blocker) { + Task<?> current = current(); + if (current instanceof TaskInternal) + return ((TaskInternal<?>)current).setBlockingTask(blocker); + return null; + } + public static void resetBlockingTask() { + Task<?> current = current(); + if (current instanceof TaskInternal) + ((TaskInternal<?>)current).resetBlockingTask(); + } + + /** convenience for setting "blocking details" on any task where the current thread is running, + * while the passed code is executed; often used from groovy as + * <pre>{@code withBlockingDetails("sleeping 5s") { Thread.sleep(5000); } }</pre> + * If code block is null, the description is set until further notice (not cleareed). */ + @SuppressWarnings("rawtypes") + public static <T> T withBlockingDetails(String description, Callable<T> code) throws Exception { + Task current = current(); + if (code==null) { + log.warn("legacy invocation of withBlockingDetails with null code block, ignoring"); + return null; + } + String prevBlockingDetails = null; + if (current instanceof TaskInternal) { + prevBlockingDetails = ((TaskInternal)current).setBlockingDetails(description); + } + try { + return code.call(); + } finally { + if (current instanceof TaskInternal) + ((TaskInternal)current).setBlockingDetails(prevBlockingDetails); + } + } + + /** the {@link Task} where the current thread is executing, if executing in a Task, otherwise null; + * if the current task is a proxy, this returns the target of that proxy */ + @SuppressWarnings("rawtypes") + public static Task current() { + return getFinalProxyTarget(BasicExecutionManager.getPerThreadCurrentTask().get()); + } + + public static Task<?> getFinalProxyTarget(Task<?> task) { + if (task==null) return null; + Task<?> proxy = ((TaskInternal<?>)task).getProxyTarget(); + if (proxy==null || proxy.equals(task)) return task; + return getFinalProxyTarget(proxy); + } + + /** creates a {@link ValueResolver} instance which allows significantly more customization than + * the various {@link #resolveValue(Object, Class, ExecutionContext)} methods here */ + public static <T> ValueResolver<T> resolving(Object v, Class<T> type) { + return new ValueResolver<T>(v, type); + } + + public static ValueResolver.ResolverBuilderPretype resolving(Object v) { + return new ValueResolver.ResolverBuilderPretype(v); + } + + /** @see #resolveValue(Object, Class, ExecutionContext, String) */ + public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec) throws ExecutionException, InterruptedException { + return new ValueResolver<T>(v, type).context(exec).get(); + } + + /** attempt to resolve the given value as the given type, waiting on futures, submitting if necessary, + * and coercing as allowed by TypeCoercions; + * contextMessage (optional) will be displayed in status reports while it waits (e.g. the name of the config key being looked up). + * if no execution context supplied (null) this method will throw an exception if the object is an unsubmitted task */ + public static <T> T resolveValue(Object v, Class<T> type, @Nullable ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException { + return new ValueResolver<T>(v, type).context(exec).description(contextMessage).get(); + } + + /** + * @see #resolveDeepValue(Object, Class, ExecutionContext, String) + */ + public static Object resolveDeepValue(Object v, Class<?> type, ExecutionContext exec) throws ExecutionException, InterruptedException { + return resolveDeepValue(v, type, exec, null); + } + + /** + * Resolves the given object, blocking on futures and coercing it to the given type. If the object is a + * map or iterable (or a list of map of maps, etc, etc) then walks these maps/iterables to convert all of + * their values to the given type. For example, the following will return a list containing a map with "1"="true": + * + * {@code Object result = resolveDeepValue(ImmutableList.of(ImmutableMap.of(1, true)), String.class, exec)} + * + * To perform a deep conversion of futures contained within Iterables or Maps without coercion of each element, + * the type should normally be Object, not the type of the collection. This differs from + * {@link #resolveValue(Object, Class, ExecutionContext, String)} which will accept Map and Iterable + * as the required type. + */ + public static <T> T resolveDeepValue(Object v, Class<T> type, ExecutionContext exec, String contextMessage) throws ExecutionException, InterruptedException { + return new ValueResolver<T>(v, type).context(exec).deep(true).description(contextMessage).get(); + } + + /** sets extra status details on the current task, if possible (otherwise does nothing). + * the extra status is presented in Task.getStatusDetails(true) + */ + public static void setExtraStatusDetails(String notes) { + Task<?> current = current(); + if (current instanceof TaskInternal) + ((TaskInternal<?>)current).setExtraStatusText(notes); + } + + public static <T> TaskBuilder<T> builder() { + return TaskBuilder.<T>builder(); + } + + private static Task<?>[] asTasks(TaskAdaptable<?> ...tasks) { + Task<?>[] result = new Task<?>[tasks.length]; + for (int i=0; i<tasks.length; i++) + result[i] = tasks[i].asTask(); + return result; + } + + public static Task<List<?>> parallel(TaskAdaptable<?> ...tasks) { + return parallelInternal("parallelised tasks", asTasks(tasks)); + } + public static Task<List<?>> parallel(String name, TaskAdaptable<?> ...tasks) { + return parallelInternal(name, asTasks(tasks)); + } + public static Task<List<?>> parallel(Iterable<? extends TaskAdaptable<?>> tasks) { + return parallel(asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); + } + public static Task<List<?>> parallel(String name, Iterable<? extends TaskAdaptable<?>> tasks) { + return parallelInternal(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); + } + private static Task<List<?>> parallelInternal(String name, Task<?>[] tasks) { + return Tasks.<List<?>>builder().name(name).parallel(true).add(tasks).build(); + } + + public static Task<List<?>> sequential(TaskAdaptable<?> ...tasks) { + return sequentialInternal("sequential tasks", asTasks(tasks)); + } + public static Task<List<?>> sequential(String name, TaskAdaptable<?> ...tasks) { + return sequentialInternal(name, asTasks(tasks)); + } + public static TaskFactory<?> sequential(TaskFactory<?> ...taskFactories) { + return sequentialInternal("sequential tasks", taskFactories); + } + public static TaskFactory<?> sequential(String name, TaskFactory<?> ...taskFactories) { + return sequentialInternal(name, taskFactories); + } + public static Task<List<?>> sequential(List<? extends TaskAdaptable<?>> tasks) { + return sequential(asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); + } + public static Task<List<?>> sequential(String name, List<? extends TaskAdaptable<?>> tasks) { + return sequential(name, asTasks(Iterables.toArray(tasks, TaskAdaptable.class))); + } + private static Task<List<?>> sequentialInternal(String name, Task<?>[] tasks) { + return Tasks.<List<?>>builder().name(name).parallel(false).add(tasks).build(); + } + private static TaskFactory<?> sequentialInternal(final String name, final TaskFactory<?> ...taskFactories) { + return new TaskFactory<TaskAdaptable<?>>() { + @Override + public TaskAdaptable<?> newTask() { + TaskBuilder<List<?>> tb = Tasks.<List<?>>builder().name(name).parallel(false); + for (TaskFactory<?> tf: taskFactories) + tb.add(tf.newTask().asTask()); + return tb.build(); + } + }; + } + + /** returns the first tag found on the given task which matches the given type, looking up the submission hierarachy if necessary */ + @SuppressWarnings("unchecked") + public static <T> T tag(@Nullable Task<?> task, Class<T> type, boolean recurseHierarchy) { + // support null task to make it easier for callers to walk hierarchies + if (task==null) return null; + for (Object tag: task.getTags()) + if (type.isInstance(tag)) return (T)tag; + if (!recurseHierarchy) return null; + return tag(task.getSubmittedByTask(), type, true); + } + + public static boolean isAncestorCancelled(Task<?> t) { + if (t==null) return false; + if (t.isCancelled()) return true; + return isAncestorCancelled(t.getSubmittedByTask()); + } + + public static boolean isQueued(TaskAdaptable<?> task) { + return ((TaskInternal<?>)task.asTask()).isQueued(); + } + + public static boolean isSubmitted(TaskAdaptable<?> task) { + return ((TaskInternal<?>)task.asTask()).isSubmitted(); + } + + public static boolean isQueuedOrSubmitted(TaskAdaptable<?> task) { + return ((TaskInternal<?>)task.asTask()).isQueuedOrSubmitted(); + } + + /** + * Adds the given task to the given context. Does not throw an exception if the addition fails. + * @return true if the task was added, false otherwise. + */ + public static boolean tryQueueing(TaskQueueingContext adder, TaskAdaptable<?> task) { + if (task==null || isQueued(task)) + return false; + try { + adder.queue(task.asTask()); + return true; + } catch (Exception e) { + if (log.isDebugEnabled()) + log.debug("Could not add task "+task+" at "+adder+": "+e); + return false; + } + } + + /** see also {@link #resolving(Object)} which gives much more control about submission, timeout, etc */ + public static <T> Supplier<T> supplier(final TaskAdaptable<T> task) { + return new Supplier<T>() { + @Override + public T get() { + return task.asTask().getUnchecked(); + } + }; + } + + /** return all children tasks of the given tasks, if it has children, else empty list */ + public static Iterable<Task<?>> children(Task<?> task) { + if (task instanceof HasTaskChildren) + return ((HasTaskChildren)task).getChildren(); + return Collections.emptyList(); + } + + /** returns failed tasks */ + public static Iterable<Task<?>> failed(Iterable<Task<?>> subtasks) { + return Iterables.filter(subtasks, new Predicate<Task<?>>() { + @Override + public boolean apply(Task<?> input) { + return input.isError(); + } + }); + } + + /** returns the task, its children, and all its children, and so on; + * @param root task whose descendants should be iterated + * @param parentFirst whether to put parents before children or after + */ + public static Iterable<Task<?>> descendants(Task<?> root, final boolean parentFirst) { + Iterable<Task<?>> descs = Iterables.concat(Iterables.transform(Tasks.children(root), new Function<Task<?>,Iterable<Task<?>>>() { + @Override + public Iterable<Task<?>> apply(Task<?> input) { + return descendants(input, parentFirst); + } + })); + if (parentFirst) return Iterables.concat(Collections.singleton(root), descs); + else return Iterables.concat(descs, Collections.singleton(root)); + } + + /** returns the error thrown by the task if {@link Task#isError()}, or null if no error or not done */ + public static Throwable getError(Task<?> t) { + if (t==null) return null; + if (!t.isDone()) return null; + if (t.isCancelled()) return new CancellationException(); + try { + t.get(); + return null; + } catch (Throwable error) { + // do not propagate as we are pretty much guaranteed above that it wasn't this + // thread which originally threw the error + return error; + } + } + public static Task<Void> fail(final String name, final Throwable optionalError) { + return Tasks.<Void>builder().dynamic(false).name(name).body(new Runnable() { public void run() { + if (optionalError!=null) throw Exceptions.propagate(optionalError); else throw new RuntimeException("Failed: "+name); + } }).build(); + } + public static Task<Void> warning(final String message, final Throwable optionalError) { + log.warn(message); + return TaskTags.markInessential(fail(message, optionalError)); + } + + /** marks the current task inessential; this mainly matters if the task is running in a parent + * {@link TaskQueueingContext} and we don't want the parent to fail if this task fails + * <p> + * no-op (silently ignored) if not in a task */ + public static void markInessential() { + Task<?> task = Tasks.current(); + if (task==null) { + TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext(); + if (qc!=null) task = qc.asTask(); + } + if (task!=null) { + TaskTags.markInessential(task); + } + } + + /** causes failures in subtasks of the current task not to fail the parent; + * no-op if not in a {@link TaskQueueingContext}. + * <p> + * essentially like a {@link #markInessential()} on all tasks in the current + * {@link TaskQueueingContext}, including tasks queued subsequently */ + @Beta + public static void swallowChildrenFailures() { + Preconditions.checkNotNull(DynamicTasks.getTaskQueuingContext(), "Task queueing context required here"); + TaskQueueingContext qc = DynamicTasks.getTaskQueuingContext(); + if (qc!=null) { + qc.swallowChildrenFailures(); + } + } + + /** as {@link TaskTags#addTagDynamically(TaskAdaptable, Object)} but for current task, skipping if no current task */ + public static void addTagDynamically(Object tag) { + Task<?> t = Tasks.current(); + if (t!=null) TaskTags.addTagDynamically(t, tag); + } + + /** + * Workaround for limitation described at {@link Task#cancel(boolean)}; + * internal method used to allow callers to wait for underlying tasks to finished in the case of cancellation. + * <p> + * It is irritating that {@link FutureTask} sync's object clears the runner thread, + * so even if {@link BasicTask#getInternalFuture()} is used, there is no means of determining if the underlying object is done. + * The {@link Task#getEndTimeUtc()} seems the only way. + * + * @return true if tasks ended; false if timed out + **/ + @Beta + public static boolean blockUntilInternalTasksEnded(Task<?> t, Duration timeout) { + CountdownTimer timer = timeout.countdownTimer(); + + if (t==null) + return true; + + if (t instanceof ScheduledTask) { + boolean result = ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining()); + if (!result) return false; + } + + t.blockUntilEnded(timer.getDurationRemaining()); + + while (true) { + if (t.getEndTimeUtc()>=0) return true; + // above should be sufficient; but just in case, trying the below + Thread tt = t.getThread(); + if (t instanceof ScheduledTask) { + ((ScheduledTask)t).blockUntilNextRunFinished(timer.getDurationRemaining()); + return true; + } else { + if (tt==null || !tt.isAlive()) { + if (!t.isCancelled()) { + // may happen for a cancelled task, interrupted after submit but before start + log.warn("Internal task thread is dead or null ("+tt+") but task not ended: "+t.getEndTimeUtc()+" ("+t+")"); + } + return true; + } + } + if (timer.isExpired()) + return false; + Time.sleep(Repeater.DEFAULT_REAL_QUICK_PERIOD); + } + } + + /** returns true if either the current thread or the current task is interrupted/cancelled */ + public static boolean isInterrupted() { + if (Thread.currentThread().isInterrupted()) return true; + Task<?> t = current(); + if (t==null) return false; + return t.isCancelled(); + } + + private static class WaitForRepeaterCallable implements Callable<Boolean> { + protected Repeater repeater; + protected boolean requireTrue; + + public WaitForRepeaterCallable(Repeater repeater, boolean requireTrue) { + this.repeater = repeater; + this.requireTrue = requireTrue; + } + + @Override + public Boolean call() { + ReferenceWithError<Boolean> result; + Tasks.setBlockingDetails(repeater.getDescription()); + try { + result = repeater.runKeepingError(); + } finally { + Tasks.resetBlockingDetails(); + } + + if (Boolean.TRUE.equals(result.getWithoutError())) + return true; + if (result.hasError()) + throw Exceptions.propagate(result.getError()); + if (requireTrue) + throw new IllegalStateException("timeout - "+repeater.getDescription()); + return false; + } + } + + /** @return a {@link TaskBuilder} which tests whether the repeater terminates with success in its configured timeframe, + * returning true or false depending on whether repeater succeed */ + public static TaskBuilder<Boolean> testing(Repeater repeater) { + return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, false)) + .name("waiting for condition") + .description("Testing whether " + getTimeoutString(repeater) + ": "+repeater.getDescription()); + } + + /** @return a {@link TaskBuilder} which requires that the repeater terminate with success in its configured timeframe, + * throwing if it does not */ + public static TaskBuilder<?> requiring(Repeater repeater) { + return Tasks.<Boolean>builder().body(new WaitForRepeaterCallable(repeater, true)) + .name("waiting for condition") + .description("Requiring " + getTimeoutString(repeater) + ": " + repeater.getDescription()); + } + + private static String getTimeoutString(Repeater repeater) { + Duration timeout = repeater.getTimeLimit(); + if (timeout==null || Duration.PRACTICALLY_FOREVER.equals(timeout)) + return "eventually"; + return "in "+timeout; + } + +}
