I agree with the idea of understanding the use cases before designing
the solution, and with using standard API classes as much as possible.
The table I sent you was intended as a first step towards that.
I'm not convinced that the right solution is a single TaskManager
successor. Different TaskManager instances may have different use cases,
and separating them may lead to several simpler solutions, each of which
as a narrower set of requirements.
Patricia
On 4/3/2013 1:55 PM, Peter wrote:
Gut feeling suggests the solution will be executor based, so you're
asking good questions, I think we need to understand the use cases
better and probably redesign dependant code too.
One example of retry, the task will continue attemtping to retry for
an entire day.
We might need some kind of delay queue, where dependant tasks can
signal to following tasks when it's ok to execute.
----- Original message -----
I am not clear on the semantics for runAfter, but maybe this can
be achieved by wrapping a Runnable within another Runnable such
that the 2nd runnable is automatically scheduled after the first
has succeeded? Likewise, it is possible to wrap a Runnable in order
to automatically retry if it throws an exception.
There are people who are experts at these patterns, but an example
is given (below my signature) for an Executor that wraps an
ExecutorService and queues Runnable instances with limited
parallelism. It hooks the Runnable in its own run() method.
If you use a ScheduledExecutorService, you can queue a task to run
with an initial and repeated delay (or at a repeated interval).
The task will be rescheduled *unless* it throws an exception. This
could be reused to periodically run-try a task after a timeout if
we convert an error thrown in the task into "no error" (hence run
after a fixed delay) and throw out a known exception if there is no
error (to terminate the retry of the task). A bit of a hack, but
it leverages existing code for re-running a task with a fixed
delay.
Thanks, Bryan
package com.bigdata.util.concurrent;
import java.util.concurrent.BlockingQueue; import
java.util.concurrent.Callable; import
java.util.concurrent.Executor; import
java.util.concurrent.ExecutorService; import
java.util.concurrent.Future; import
java.util.concurrent.FutureTask; import
java.util.concurrent.LinkedBlockingDeque; import
java.util.concurrent.RejectedExecutionException; import
java.util.concurrent.Semaphore;
import org.apache.log4j.Logger;
/** * A fly weight helper class that runs tasks either sequentially
or with limited * parallelism against some thread pool. Deadlock
can arise when limited * parallelism is applied if there are
dependencies among the tasks. Limited * parallelism is enforced
using a counting {@link Semaphore}. New tasks can * start iff the
latch is non-zero. The maximum parallelism is the minimum of * the
value specified to the constructor and the potential parallelism
of the * delegate service. * <p> * Note: The pattern for running
tasks on this service is generally to * {@link #execute(Runnable)}
a {@link Runnable} and to make that * {@link Runnable} a {@link
FutureTask} if you want to await the {@link Future} * of a {@link
Runnable} or {@link Callable} or otherwise manage its execution. *
<p> * Note: This class can NOT be trivially wrapped as an {@link
ExecutorService} * since the resulting delegation pattern for
submit() winds up invoking * execute() on the delegate {@link
ExecutorService} rather than on this class. * * @author <a
href="mailto:thompson...@users.sourceforge.net">Bryan Thompson</a>
* @version $Id: LatchedExecutor.java 6749 2012-12-03 14:42:48Z
thompsonbry $ */ public class LatchedExecutor implements Executor
{
private static final transient Logger log = Logger
.getLogger(LatchedExecutor.class);
/** * The delegate executor. */ private final Executor executor;
/** * This is used to limit the concurrency with which tasks
submitted to this * class may execute on the delegate {@link
#executor}. */ private final Semaphore semaphore;
/** * A thread-safe blocking queue of pending tasks. * * @todo The
capacity of this queue does not of necessity need to be *
unbounded. */ private final BlockingQueue<Runnable> queue = new
LinkedBlockingDeque<Runnable>(/*unbounded*/);
private final int nparallel;
/** * Return the maximum parallelism allowed by this {@link
Executor}. */ public int getNParallel() {
return nparallel;
}
public LatchedExecutor(final Executor executor, final int
nparallel) {
if (executor == null) throw new IllegalArgumentException();
if (nparallel < 1) throw new IllegalArgumentException();
this.executor = executor;
this.nparallel = nparallel;
this.semaphore = new Semaphore(nparallel);
}
public void execute(final Runnable r) { if (!queue.offer(new
Runnable() { /* * Wrap the Runnable in a class that will start the
next Runnable * from the queue when it completes. */ public void
run() { try { r.run(); } finally { scheduleNext(); } } })) { // The
queue is full. throw new RejectedExecutionException(); } if
(semaphore.tryAcquire()) { // We were able to obtain a permit, so
start another task. scheduleNext(); } }
/** * Schedule the next task if one is available (non-blocking). *
<p> * Pre-condition: The caller has a permit. */ private void
scheduleNext() { while (true) { Runnable next = null; if ((next =
queue.poll()) != null) { try { executor.execute(next); return; }
catch (RejectedExecutionException ex) { // log error and poll the
queue again. log.error(ex, ex); continue; } } else {
semaphore.release(); return; } } }
}