runAfter is a method in the TaskManager.Task interface, implemented by
each of its tasks:
/**
* Return true if this task must be run after at least one task
* in the given task list with an index less than size (size may be
* less then tasks.size()). Using List.get will be more efficient
* than List.iterator.
*
* @param tasks the tasks to consider. A read-only List, with all
* elements instanceof Task.
* @param size elements with index less than size should be considered
*
The notes I sent to Peter were part of an effort on my part to improve
performance. This has O(N^2) tendencies, because whenever a task
finishes the TaskManager has to ask each waiting task whether it still
needs to wait for any older task. I wanted to change it so that
TaskManager would know which task was being waited for. It could then
associate with one task a list of tasks that need to be reconsidered
when it finishes.
I think runAfter has two possible uses, and I'm not sure which cases are
for which purpose:
1. Mutual exclusion - two tasks should not be running at the same time.
That could be implemented by the younger returning true for a task list
containing the older. In this case the sort of overtaking I described
below does not matter.
2. Order preservation - A task needs a state change to have happened
that will not happen until after an older task has run.
Patricia
On 4/2/2013 2:17 PM, Bryan Thompson wrote:
I am not clear on the semantics for runAfter, but maybe this can be
achieved by wrapping a Runnable within another Runnable such that the 2nd
runnable is automatically scheduled after the first has succeeded?
Likewise, it is possible to wrap a Runnable in order to automatically
retry if it throws an exception.
There are people who are experts at these patterns, but an example is
given (below my signature) for an Executor that wraps an ExecutorService
and queues Runnable instances with limited parallelism. It hooks the
Runnable in its own run() method.
If you use a ScheduledExecutorService, you can queue a task to run with an
initial and repeated delay (or at a repeated interval). The task will be
rescheduled *unless* it throws an exception. This could be reused to
periodically run-try a task after a timeout if we convert an error thrown
in the task into "no error" (hence run after a fixed delay) and throw out
a known exception if there is no error (to terminate the retry of the
task). A bit of a hack, but it leverages existing code for re-running a
task with a fixed delay.
Thanks,
Bryan
package com.bigdata.util.concurrent;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.Semaphore;
import org.apache.log4j.Logger;
/**
* A fly weight helper class that runs tasks either sequentially or with
limited
* parallelism against some thread pool. Deadlock can arise when limited
* parallelism is applied if there are dependencies among the tasks.
Limited
* parallelism is enforced using a counting {@link Semaphore}. New tasks
can
* start iff the latch is non-zero. The maximum parallelism is the minimum
of
* the value specified to the constructor and the potential parallelism of
the
* delegate service.
* <p>
* Note: The pattern for running tasks on this service is generally to
* {@link #execute(Runnable)} a {@link Runnable} and to make that
* {@link Runnable} a {@link FutureTask} if you want to await the {@link
Future}
* of a {@link Runnable} or {@link Callable} or otherwise manage its
execution.
* <p>
* Note: This class can NOT be trivially wrapped as an {@link
ExecutorService}
* since the resulting delegation pattern for submit() winds up invoking
* execute() on the delegate {@link ExecutorService} rather than on this
class.
*
* @author <a href="mailto:thompson...@users.sourceforge.net">Bryan
Thompson</a>
* @version $Id: LatchedExecutor.java 6749 2012-12-03 14:42:48Z
thompsonbry $
*/
public class LatchedExecutor implements Executor {
private static final transient Logger log = Logger
.getLogger(LatchedExecutor.class);
/**
* The delegate executor.
*/
private final Executor executor;
/**
* This is used to limit the concurrency with which tasks submitted to
this
* class may execute on the delegate {@link #executor}.
*/
private final Semaphore semaphore;
/**
* A thread-safe blocking queue of pending tasks.
*
* @todo The capacity of this queue does not of necessity need to be
* unbounded.
*/
private final BlockingQueue<Runnable> queue = new
LinkedBlockingDeque<Runnable>(/*unbounded*/);
private final int nparallel;
/**
* Return the maximum parallelism allowed by this {@link Executor}.
*/
public int getNParallel() {
return nparallel;
}
public LatchedExecutor(final Executor executor, final int nparallel) {
if (executor == null)
throw new IllegalArgumentException();
if (nparallel < 1)
throw new IllegalArgumentException();
this.executor = executor;
this.nparallel = nparallel;
this.semaphore = new Semaphore(nparallel);
}
public void execute(final Runnable r) {
if (!queue.offer(new Runnable() {
/*
* Wrap the Runnable in a class that will start the next
Runnable
* from the queue when it completes.
*/
public void run() {
try {
r.run();
} finally {
scheduleNext();
}
}
})) {
// The queue is full.
throw new RejectedExecutionException();
}
if (semaphore.tryAcquire()) {
// We were able to obtain a permit, so start another task.
scheduleNext();
}
}
/**
* Schedule the next task if one is available (non-blocking).
* <p>
* Pre-condition: The caller has a permit.
*/
private void scheduleNext() {
while (true) {
Runnable next = null;
if ((next = queue.poll()) != null) {
try {
executor.execute(next);
return;
} catch (RejectedExecutionException ex) {
// log error and poll the queue again.
log.error(ex, ex);
continue;
}
} else {
semaphore.release();
return;
}
}
}
}