Author: peter_firmstone Date: Tue Feb 18 10:42:59 2014 New Revision: 1569259
URL: http://svn.apache.org/r1569259 Log: Add some latency into ServiceDiscoveryManager's ExecutorService to simulate TaskManager's contention, sufficient to allow wiring up of dependencies between tasks. Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java?rev=1569259&r1=1569258&r2=1569259&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/AddListenerEvent.java Tue Feb 18 10:42:59 2014 @@ -32,6 +32,7 @@ import java.util.logging.Level; import com.sun.jini.qa.harness.QAConfig; import com.sun.jini.qa.harness.Test; +import com.sun.jini.test.share.LookupServices; /** * This class verifies that bug 4712396 has been fixed. As stated in the @@ -62,8 +63,8 @@ public class AddListenerEvent extends Ab protected SDMListener[] sdmListener = new SDMListener[nListeners]; public static class SDMListener extends AbstractBaseTest.SrvcListener { - String testName; - int listenerIndx; + final String testName; + final int listenerIndx; public SDMListener(QAConfig config, String classname, int listenerIndx) { super(config,classname); this.testName = classname; @@ -117,9 +118,10 @@ public class AddListenerEvent extends Ab */ protected void applyTestDef() throws Exception { /* Register new proxies */ - int nServices = getLookupServices().getnServices(); - int nAttributes = getLookupServices().getnAttributes(); - int nSecsServiceDiscovery = getLookupServices().getnSecsServiceDiscovery(); + LookupServices lookupServices = getLookupServices(); + int nServices = lookupServices.getnServices(); + int nAttributes = lookupServices.getnAttributes(); + int nSecsServiceDiscovery = lookupServices.getnSecsServiceDiscovery(); registerServices(0,nServices,nAttributes,testServiceType); /* Create a cache for the service that was registered; register * the first listener to receive service discovery events. Modified: river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java?rev=1569259&r1=1569258&r2=1569259&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/qa/src/com/sun/jini/test/impl/servicediscovery/event/ReRegisterGoodEquals.java Tue Feb 18 10:42:59 2014 @@ -68,10 +68,11 @@ public class ReRegisterGoodEquals extend */ public Test construct(QAConfig config) throws Exception { super.construct(config); - testDesc = ""+getnLookupServices()+" lookup service(s), "+getnServices() + int nServices = getnServices(); + testDesc = ""+getnLookupServices()+" lookup service(s), " + nServices +" service(s) with well-defined equals() method"; - nAddedExpected = getnServices()*2; - nRemovedExpected = nAddedExpected-getnServices(); + nAddedExpected = nServices*2; + nRemovedExpected = nAddedExpected-nServices; testServiceType = AbstractBaseTest.TEST_SERVICE; return this; }//end construct Modified: river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java?rev=1569259&r1=1569258&r2=1569259&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/com/sun/jini/reggie/RegistrarImpl.java Tue Feb 18 10:42:59 2014 @@ -544,24 +544,6 @@ class RegistrarImpl implements Registrar unicastDiscoveryConstraints = init.unicastDiscoveryConstraints; context = init.context; eventNotifierExec = new SynchronousExecutors(init.scheduledExecutor); - -// new ExtensibleExecutorService(init.executor, -// new RunnableFutureFactory(){ -// -// @Override -// public <T> RunnableFuture<T> newTaskFor(Runnable r, T value) { -// if (r instanceof ObservableFutureTask) return (RunnableFuture<T>) r; -// return new ObservableFutureTask(r, value); -// } -// -// @Override -// public <T> RunnableFuture<T> newTaskFor(Callable<T> c) { -// if (c instanceof ObservableFutureTask) return (RunnableFuture<T>) c; -// return new ObservableFutureTask(c); -// } -// -// }); -// eventTaskQueue = new EventTaskQueue(eventNotifierExec); eventTaskMap = new TreeMap<EventReg,ExecutorService>(); discoveryResponseExec = init.executor; ReliableLog log = null; @@ -2049,109 +2031,7 @@ class RegistrarImpl implements Registrar reg = null; } } - -// private static class DependencyLinker implements FutureObserver { -// private final ExecutorService executor; -// private final ObservableFuture precedent; -// private final FutureTask dependant; -// -// public DependencyLinker(ExecutorService ex, ObservableFuture precedent, FutureTask dep) { -// executor = ex; -// this.precedent = precedent; -// dependant = dep; -// } -// -// public synchronized void register() { -// precedent.addObserver(this); -// } -// -// @Override -// public synchronized void futureCompleted(Future e) { -// if (precedent.equals(e)) executor.submit(dependant); -// } -// -// } -// /** -// * -// */ -// static final class EventTaskWrapper extends ObservableFutureTask -// { -// private final EventTask task; -// -// private EventTaskWrapper(Runnable r, Object result) { -// super(r, result); -// if (r instanceof EventTask) task = (EventTask) r; -// else task = null; -// } -// -// private EventTaskWrapper(Callable c) -// { -// super(c); -// task = null; -// } -// -// private EventTask getTask(){ -// return task; -// } -// } -// -// private static class EventTaskWrapperComparator -// implements Comparator<EventTaskWrapper> { -// -// @Override -// public int compare(EventTaskWrapper o1, EventTaskWrapper o2) { -// EventTask t1 = o1.getTask(); -// EventTask t2 = o2.getTask(); -// if (t1.seqNo < t2.seqNo) return -1; -// if (t1.seqNo > t2.seqNo) return 1; -// return 0; -// } -// -// } -// -// private static final class EventTaskQueue implements FutureObserver { -// // CacheTasks pending completion. -// private final List<EventTaskWrapper> pending; -// private final ExecutorService executor; -// -// private EventTaskQueue(ExecutorService e){ -// this.pending = new ArrayList<EventTaskWrapper>(200); -// executor = e; -// } -// -// private EventTaskWrapper submit(EventTask t){ -// EventTaskWrapper future = new EventTaskWrapper(t, null); -// EventTaskWrapper precedent = null; -// synchronized (pending){ -// pending.add(future); -// future.addObserver(this); -// Iterator<EventTaskWrapper> it = pending.iterator(); -// while (it.hasNext()){ -// EventTaskWrapper w = it.next(); -// EventTask c = w.getTask(); -// if (t.dependsOn(c)) { -// precedent = w; -// break; -// } -// } -// } -// if (precedent == null){ -// executor.submit(future); -// } else { -// DependencyLinker linker = new DependencyLinker(executor, precedent, future); -// linker.register(); -// } -// return future; -// } -// -// @Override -// public void futureCompleted(Future e) { -// synchronized (pending){ -// pending.remove(e); -// } -// } -// } -// + /** An event to be sent, and the listener to send it to. */ private static final class EventTask implements Callable<Boolean>, Comparable<EventTask> { Modified: river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java?rev=1569259&r1=1569258&r2=1569259&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/net/jini/lookup/ServiceDiscoveryManager.java Tue Feb 18 10:42:59 2014 @@ -984,6 +984,11 @@ public class ServiceDiscoveryManager { CacheTaskWrapper future = new CacheTaskWrapper(t, null); pending.offer(future); future.addObserver(this); + try { + Thread.sleep(100L); //Brief sleep to allow deps to find each other. + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); // restore. + } if (t.hasDeps()) { List<ObservableFuture> deps = new ArrayList<ObservableFuture>(); Iterator<CacheTaskWrapper> it = pending.iterator(); Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java?rev=1569259&r1=1569258&r2=1569259&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/DependencyLinker.java Tue Feb 18 10:42:59 2014 @@ -31,9 +31,14 @@ public class DependencyLinker implements } public synchronized void register() { - Iterator<ObservableFuture> it = tasks.iterator(); - while (it.hasNext()) { - it.next().addObserver(this); + for (int i = 0; i < tasks.size(); i++){ + ObservableFuture f = null; + try { + f = tasks.get(i); + } catch (IndexOutOfBoundsException e){ + continue; + } + if (f != null) f.addObserver(this); } } Modified: river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java URL: http://svn.apache.org/viewvc/river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java?rev=1569259&r1=1569258&r2=1569259&view=diff ============================================================================== --- river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java (original) +++ river/jtsk/skunk/qa_refactor/trunk/src/org/apache/river/impl/thread/SynchronousExecutors.java Tue Feb 18 10:42:59 2014 @@ -440,17 +440,20 @@ public class SynchronousExecutors implem reschedule = true; throw e; } finally { - if (reschedule) { - attempt++; - queue.peek = null; // set peek to null to unblock queue. - executorLock.lock(); - try { - waiting.signalAll(); - } finally { - executorLock.unlock(); + try { + if (reschedule) { + attempt++; + queue.peek = null; // set peek to null to unblock queue. + executorLock.lock(); + try { + waiting.signalAll(); + } finally { + executorLock.unlock(); + } } + } finally { + queue.lock.unlock(); } - queue.lock.unlock(); } }
