Author: peter_firmstone Date: Mon Feb 3 11:56:32 2014 New Revision: 1563851
URL: http://svn.apache.org/r1563851 Log: New dependency linking and call backs for JoinManager ProxyRegTask. Same dependency code. Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java?rev=1563851&r1=1563850&r2=1563851&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/JoinManager.java Mon Feb 3 11:56:32 2014 @@ -53,9 +53,11 @@ import java.util.Iterator; import java.util.LinkedList; import java.util.List; import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.PriorityBlockingQueue; import java.util.concurrent.RunnableFuture; import java.util.concurrent.ThreadPoolExecutor; @@ -63,6 +65,7 @@ import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import org.apache.river.api.util.FutureObserver; +import org.apache.river.impl.thread.DependencyLinker; import org.apache.river.impl.thread.ExtensibleExecutorService; import org.apache.river.impl.thread.ExtensibleExecutorService.RunnableFutureFactory; import org.apache.river.impl.thread.NamedThreadFactory; @@ -688,22 +691,17 @@ public class JoinManager { * @param tasks the tasks with which to compare the current task * @param size elements with index less than size are considered */ -// public boolean runAfter(List tasks, int size) { -// /* If the service's ID has already been set, then it's okay -// * to run all ProxyRegTask's in parallel, otherwise, the -// * ProxyRegTask with the lowest sequence number should be run. -// */ -// if(serviceItem.serviceID != null) return false; -// /* For task with lowest seq #, run it now; else run it later */ -// for(int i=0; i<size; i++) { -// Object t = tasks.get(i); -// if (t instanceof ProxyRegTask){ -// int nextTaskSeqN = ((ProxyRegTask)t).getSeqN(); -// if( seqN > nextTaskSeqN ) return true; -// } -// }//end loop -// return false; -// }//end runAfter + public boolean dependsOn(ProxyRegTask t) { + return seqN > t.getSeqN(); + } + + /* If the service's ID has already been set, then it's okay + * to run all ProxyRegTask's in parallel, otherwise, the + * ProxyRegTask with the lowest sequence number should be run. + */ + public boolean hasDeps(){ + return serviceItem.serviceID == null; + } /** Accessor method that returns the instance of <code>ProxyReg</code> * (the lookup service) associated with the task represented by @@ -755,6 +753,46 @@ public class JoinManager { return 0; } }//end class ProxyRegTask + + private static final class ProxyRegTaskQueue implements FutureObserver { + // CacheTasks pending completion. + private final ConcurrentLinkedQueue<ProxyRegTask> pending; + private final ExecutorService executor; + + private ProxyRegTaskQueue(ExecutorService e){ + this.pending = new ConcurrentLinkedQueue<ProxyRegTask>(); + executor = e; + } + + private Future submit(ProxyRegTask t){ + pending.offer(t); + t.addObserver(this); + if (t.hasDeps()) { + List<ObservableFuture> deps = new LinkedList<ObservableFuture>(); + Iterator<ProxyRegTask> it = pending.iterator(); + while (it.hasNext()){ + ProxyRegTask c = it.next(); + if (t.dependsOn(c)) { + deps.add(c); + } + } + if (deps.isEmpty()){ + executor.submit(t); + } else { + DependencyLinker linker = new DependencyLinker(executor, deps, t); + linker.register(); + } + } else { + executor.submit(t); + } + return t; + } + + @Override + public void futureCompleted(Future e) { + pending.remove(e); + } + } /** Abstract base class from which all the sub-task classes are derived. */ private static abstract class JoinTask { @@ -1234,7 +1272,7 @@ public class JoinManager { if(this.proxyRegTask == null) { this.proxyRegTask = new ProxyRegTask(this,taskSeqN++); this.proxyRegTask.addObserver(this); - future = taskMgr.submit(this.proxyRegTask); + future = proxyRegTaskQueue.submit(this.proxyRegTask); }//endif }//end sync(taskList) synchronized (runningTasks){ @@ -1480,6 +1518,7 @@ public class JoinManager { * <code>TaskManager</code>. */ private final ExecutorService taskMgr; + private final ProxyRegTaskQueue proxyRegTaskQueue; /** Maximum number of times a failed task is allowed to be re-executed. */ private final int maxNRetries; /** Wakeup manager for the various tasks executed by this join manager. @@ -2580,7 +2619,7 @@ public class JoinManager { MAX_N_TASKS, /* Ignored */ 15, TimeUnit.SECONDS, - new PriorityBlockingQueue(100), /* Unbounded Queue */ + new LinkedBlockingQueue(), /* Unbounded Queue */ new NamedThreadFactory("JoinManager executor thread", false) ); } @@ -2670,6 +2709,7 @@ public class JoinManager { } }); + proxyRegTaskQueue = new ProxyRegTaskQueue(taskMgr); wakeupMgr = conf.wakeupManager; maxNRetries = conf.maxNretrys; leaseRenewalMgr = conf.leaseRenewalManager; Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1563851&r1=1563850&r2=1563851&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Mon Feb 3 11:56:32 2014 @@ -1022,7 +1022,7 @@ public class ServiceDiscoveryManager { CacheTask t = w.getTask(); if(t.isFromProxy(reg)) { w.cancel(true); // Also causes task to be removed - } + } }//end loop }//end LookupCacheImpl.removeUselessTask } Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java?rev=1563851&r1=1563850&r2=1563851&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java Mon Feb 3 11:56:32 2014 @@ -12,6 +12,7 @@ import java.util.List; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; import java.util.concurrent.FutureTask; +import java.util.concurrent.RunnableFuture; import org.apache.river.api.util.FutureObserver; /** @@ -21,9 +22,9 @@ import org.apache.river.api.util.FutureO public class DependencyLinker implements FutureObserver { private final ExecutorService executor; private final List<ObservableFuture> tasks; - private final FutureTask dependant; + private final RunnableFuture dependant; - public DependencyLinker(ExecutorService ex, List<ObservableFuture> tasks, FutureTask dep) { + public DependencyLinker(ExecutorService ex, List<ObservableFuture> tasks, RunnableFuture dep) { executor = ex; this.tasks = new ArrayList<ObservableFuture>(tasks); dependant = dep;
