For a moment, lets consider how we might fix TaskManager, while retaining the existing Task.runAfter method and how the fix would impact TaskManager's users.
TaskManager, like most thread pools has a queue. Before a task is removed from the queue for execution, it is asked if it should "runAfter" any other task present in the queue, while this occurs, the queue is locked. Each task is removed from the queue before it executes. Now what happens if a task fails to execute, or it's thread is suspended by the OS scheduler and is not in the queue during execution and another task/s that depend on it need to "runAfter" are allowed to execute before it completes? Because a task doesn't know which tasks depend upon it running first, the only way to fix TaskManager is to only remove a task after it completes successfully. Now TaskManager has no scheduling capability, which means that the fix will require the task to both remain in the queue and consume a pool thread until it successfully completes because once the Task.run method completes it is assumed to have completed successfully. RetryTask and WakupManager would also need to be rewritten to accommodate this requirement. The result will be correctly ordered execution of tasks, with increased thread & memory consumption as well as significantly reduced throughput. The reduced throughput could also help mask other issues by reducing concurrency. Regards, Peter. ----- Original message ----- > The problem is with TaskManager's public api method Task.runAfter. This > is well documented in River-344. > > The fix requires changing every class that uses it > > As a thread pool TaskManager is correct provided that no ordering > dependencies exist between tasks. > > TaskManager doesn't compare to Doug Lee's ExecutorService > implementations, it should be consigned to history, lets clean our code > up. > > What configuration choice did we originally have anyway? Any Executor > you like as long as it's TaskManager? > > TaskManager is not part of our public api, it's an implementation detail > in com.sun.* > > Regards, > > Peter. > ----- Original message ----- > > > > I’d like you to make a reasonable case for why TaskManager needs to be > > replaced, requiring changes to many other classes that depend on > > TaskManager, rather than stating what the problem is with TaskManager > > and seeking to fix it, which would only affect TaskManager and not > > require modifying and then debugging other code. > > > > Greg. > > > > On Jan 4, 2014, at 5:53 AM, Peter Firmstone <j...@zeus.net.au> wrote: > > > > > Would you like me to add this class, so that existing configurations > > > utilising a TaskManager can also be used? This might be useful for > > > retaining backward compatibility with existing configurations? > > > > > > Regards, > > > > > > Peter. > > > > > > > > > /* > > > * Licensed to the Apache Software Foundation (ASF) under one > > > * or more contributor license agreements. See the NOTICE file > > > * distributed with this work for additional information > > > * regarding copyright ownership. The ASF licenses this file > > > * to you under the Apache License, Version 2.0 (the > > > * "License"); you may not use this file except in compliance > > > * with the License. You may obtain a copy of the License at > > > * > > > * http://www.apache.org/licenses/LICENSE-2.0 > > > * > > > * Unless required by applicable law or agreed to in writing, software > > > * distributed under the License is distributed on an "AS IS" BASIS, > > > * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or > > > implied. * See the License for the specific language governing > > > permissions and * limitations under the License. > > > */ > > > > > > package com.sun.jini.thread; > > > > > > import com.sun.jini.thread.TaskManager.Task; > > > import java.util.List; > > > import java.util.concurrent.AbstractExecutorService; > > > import java.util.concurrent.ExecutorService; > > > import java.util.concurrent.RejectedExecutionException; > > > import java.util.concurrent.TimeUnit; > > > > > > /** > > > * > > > * @author peter > > > */ > > > public class TaskManagerWrapper extends AbstractExecutorService > > > implements ExecutorService { > > > > > > private final TaskManager tm; > > > private final PosionPill pill; > > > private volatile boolean isShutdown; > > > > > > public TaskManagerWrapper(TaskManager manager){ > > > tm = manager; > > > isShutdown = false; > > > pill = new PosionPill(manager); > > > } > > > > > > @Override > > > public void shutdown() { > > > isShutdown = true; > > > tm.add(pill); > > > } > > > > > > @Override > > > public List<Runnable> shutdownNow() { > > > isShutdown = true; > > > tm.terminate(); > > > return tm.getPending(); > > > } > > > > > > @Override > > > public boolean isShutdown() { > > > return isShutdown; > > > } > > > > > > @Override > > > public boolean isTerminated() { > > > return isShutdown; > > > } > > > > > > @Override > > > public boolean awaitTermination(long timeout, TimeUnit unit) throws > > > InterruptedException { long start = System.currentTimeMillis(); > > > long duration = unit.toMillis(timeout); > > > synchronized (pill){ > > > while (!pill.terminated){ > > > wait(duration); > > > if (pill.terminated) return true; > > > long elapsed = System.currentTimeMillis() - start; > > > if (elapsed >= duration) return false; > > > duration = duration - elapsed; > > > } > > > } > > > return true; // pill was terminated. > > > } > > > > > > @Override > > > public void execute(Runnable command) { > > > if (isShutdown) throw new RejectedExecutionException("TaskManager > > > terminated"); } > > > > > > private static class PosionPill implements Task { > > > > > > private final TaskManager tm; > > > boolean terminated; > > > > > > PosionPill(TaskManager tm){ > > > this.tm = tm; > > > terminated = false; > > > } > > > > > > @Override > > > public boolean runAfter(List tasks, int size) { > > > if (!tasks.isEmpty()) return true; // Make sure we always run last. > > > return false; > > > } > > > > > > @Override > > > public void run() { > > > tm.terminate(); // Bye. > > > synchronized (this){ > > > terminated = true; > > > notifyAll(); > > > } > > > } > > > > > > } > > > > > > } > > >