Synchonizer -> Synchronizer? D.
On Sun, Jun 10, 2012 at 6:42 PM, <[email protected]> wrote: > Author: simonw > Date: Sun Jun 10 16:42:55 2012 > New Revision: 1348623 > > URL: http://svn.apache.org/viewvc?rev=1348623&view=rev > Log: > LUCENE-4116: fix concurrency test for DWPTStallControl > > Modified: > lucene/dev/branches/branch_4x/ (props changed) > lucene/dev/branches/branch_4x/dev-tools/ (props changed) > lucene/dev/branches/branch_4x/lucene/ (props changed) > lucene/dev/branches/branch_4x/lucene/BUILD.txt (props changed) > lucene/dev/branches/branch_4x/lucene/CHANGES.txt (props changed) > lucene/dev/branches/branch_4x/lucene/JRE_VERSION_MIGRATION.txt (props > changed) > lucene/dev/branches/branch_4x/lucene/LICENSE.txt (props changed) > lucene/dev/branches/branch_4x/lucene/MIGRATE.txt (props changed) > lucene/dev/branches/branch_4x/lucene/NOTICE.txt (props changed) > lucene/dev/branches/branch_4x/lucene/README.txt (props changed) > lucene/dev/branches/branch_4x/lucene/analysis/ (props changed) > lucene/dev/branches/branch_4x/lucene/analysis/common/ (props changed) > > lucene/dev/branches/branch_4x/lucene/analysis/common/src/java/org/apache/lucene/analysis/standard/std31/package.html > (props changed) > > lucene/dev/branches/branch_4x/lucene/analysis/common/src/java/org/apache/lucene/analysis/standard/std34/package.html > (props changed) > lucene/dev/branches/branch_4x/lucene/backwards/ (props changed) > lucene/dev/branches/branch_4x/lucene/benchmark/ (props changed) > lucene/dev/branches/branch_4x/lucene/build.xml (props changed) > lucene/dev/branches/branch_4x/lucene/common-build.xml (props changed) > lucene/dev/branches/branch_4x/lucene/core/ (props changed) > > lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java > > lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java > lucene/dev/branches/branch_4x/lucene/demo/ (props changed) > lucene/dev/branches/branch_4x/lucene/facet/ (props changed) > lucene/dev/branches/branch_4x/lucene/grouping/ (props changed) > lucene/dev/branches/branch_4x/lucene/highlighter/ (props changed) > lucene/dev/branches/branch_4x/lucene/ivy-settings.xml (props changed) > lucene/dev/branches/branch_4x/lucene/join/ (props changed) > lucene/dev/branches/branch_4x/lucene/memory/ (props changed) > lucene/dev/branches/branch_4x/lucene/misc/ (props changed) > lucene/dev/branches/branch_4x/lucene/module-build.xml (props changed) > lucene/dev/branches/branch_4x/lucene/queries/ (props changed) > lucene/dev/branches/branch_4x/lucene/queryparser/ (props changed) > lucene/dev/branches/branch_4x/lucene/sandbox/ (props changed) > lucene/dev/branches/branch_4x/lucene/site/ (props changed) > lucene/dev/branches/branch_4x/lucene/spatial/ (props changed) > lucene/dev/branches/branch_4x/lucene/suggest/ (props changed) > lucene/dev/branches/branch_4x/lucene/test-framework/ (props changed) > lucene/dev/branches/branch_4x/lucene/tools/ (props changed) > lucene/dev/branches/branch_4x/solr/ (props changed) > lucene/dev/branches/branch_4x/solr/CHANGES.txt (props changed) > lucene/dev/branches/branch_4x/solr/LICENSE.txt (props changed) > lucene/dev/branches/branch_4x/solr/NOTICE.txt (props changed) > lucene/dev/branches/branch_4x/solr/README.txt (props changed) > lucene/dev/branches/branch_4x/solr/build.xml (props changed) > lucene/dev/branches/branch_4x/solr/cloud-dev/ (props changed) > lucene/dev/branches/branch_4x/solr/common-build.xml (props changed) > lucene/dev/branches/branch_4x/solr/contrib/ (props changed) > lucene/dev/branches/branch_4x/solr/core/ (props changed) > lucene/dev/branches/branch_4x/solr/dev-tools/ (props changed) > lucene/dev/branches/branch_4x/solr/example/ (props changed) > lucene/dev/branches/branch_4x/solr/lib/ (props changed) > lucene/dev/branches/branch_4x/solr/lib/httpclient-LICENSE-ASL.txt (props > changed) > lucene/dev/branches/branch_4x/solr/lib/httpclient-NOTICE.txt (props > changed) > lucene/dev/branches/branch_4x/solr/lib/httpcore-LICENSE-ASL.txt (props > changed) > lucene/dev/branches/branch_4x/solr/lib/httpcore-NOTICE.txt (props > changed) > lucene/dev/branches/branch_4x/solr/lib/httpmime-LICENSE-ASL.txt (props > changed) > lucene/dev/branches/branch_4x/solr/lib/httpmime-NOTICE.txt (props > changed) > lucene/dev/branches/branch_4x/solr/scripts/ (props changed) > lucene/dev/branches/branch_4x/solr/solrj/ (props changed) > lucene/dev/branches/branch_4x/solr/test-framework/ (props changed) > lucene/dev/branches/branch_4x/solr/testlogging.properties (props changed) > lucene/dev/branches/branch_4x/solr/webapp/ (props changed) > > Modified: > lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java > URL: > http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java?rev=1348623&r1=1348622&r2=1348623&view=diff > ============================================================================== > --- > lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java > (original) > +++ > lucene/dev/branches/branch_4x/lucene/core/src/java/org/apache/lucene/index/DocumentsWriterStallControl.java > Sun Jun 10 16:42:55 2012 > @@ -39,7 +39,6 @@ import org.apache.lucene.util.ThreadInte > final class DocumentsWriterStallControl { > @SuppressWarnings("serial") > private static final class Sync extends AbstractQueuedSynchronizer { > - volatile boolean hasBlockedThreads = false; // only with assert > > Sync() { > setState(0); > @@ -67,15 +66,10 @@ final class DocumentsWriterStallControl > > @Override > public int tryAcquireShared(int acquires) { > - assert maybeSetHasBlocked(getState()); > return getState() == 0 ? 1 : -1; > } > > - // only used for testing > - private boolean maybeSetHasBlocked(int state) { > - hasBlockedThreads |= getState() != 0; > - return true; > - } > + > > @Override > public boolean tryReleaseShared(int newState) { > @@ -130,7 +124,7 @@ final class DocumentsWriterStallControl > } > > boolean hasBlocked() { // for tests > - return sync.hasBlockedThreads; > + return sync.hasQueuedThreads(); > } > > static interface MemoryController { > @@ -138,4 +132,12 @@ final class DocumentsWriterStallControl > long flushBytes(); > long stallLimitBytes(); > } > + > + public boolean isHealthy() { > + return sync.isHealthy(); > + } > + > + public boolean isThreadQueued(Thread t) { > + return sync.isQueued(t); > + } > } > > Modified: > lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java > URL: > http://svn.apache.org/viewvc/lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java?rev=1348623&r1=1348622&r2=1348623&view=diff > ============================================================================== > --- > lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java > (original) > +++ > lucene/dev/branches/branch_4x/lucene/core/src/test/org/apache/lucene/index/TestDocumentsWriterStallControl.java > Sun Jun 10 16:42:55 2012 > @@ -127,22 +127,19 @@ public class TestDocumentsWriterStallCon > int numStallers = atLeast(1); > int numReleasers = atLeast(1); > int numWaiters = atLeast(1); > - > - final CountDownLatch[] latches = new CountDownLatch[] { > - new CountDownLatch(numStallers + numReleasers), new > CountDownLatch(1), > - new CountDownLatch(numWaiters)}; > + final Synchonizer sync = new Synchonizer(numStallers + numReleasers, > numStallers + numReleasers+numWaiters); > Thread[] threads = new Thread[numReleasers + numStallers + numWaiters]; > List<Throwable> exceptions = Collections.synchronizedList(new > ArrayList<Throwable>()); > for (int i = 0; i < numReleasers; i++) { > - threads[i] = new Updater(stop, checkPoint, ctrl, latches, true, > exceptions); > + threads[i] = new Updater(stop, checkPoint, ctrl, sync, true, > exceptions); > } > for (int i = numReleasers; i < numReleasers + numStallers; i++) { > - threads[i] = new Updater(stop, checkPoint, ctrl, latches, false, > exceptions); > + threads[i] = new Updater(stop, checkPoint, ctrl, sync, false, > exceptions); > > } > for (int i = numReleasers + numStallers; i < numReleasers + numStallers > + numWaiters; i++) { > - threads[i] = new Waiter(stop, checkPoint, ctrl, latches, exceptions); > + threads[i] = new Waiter(stop, checkPoint, ctrl, sync, exceptions); > > } > > @@ -151,7 +148,7 @@ public class TestDocumentsWriterStallCon > for (int i = 0; i < iters; i++) { > if (checkPoint.get()) { > > - assertTrue("timed out waiting for update threads - deadlock?", > latches[0].await(10, TimeUnit.SECONDS)); > + assertTrue("timed out waiting for update threads - deadlock?", > sync.updateJoin.await(10, TimeUnit.SECONDS)); > if (!exceptions.isEmpty()) { > for (Throwable throwable : exceptions) { > throwable.printStackTrace(); > @@ -159,27 +156,38 @@ public class TestDocumentsWriterStallCon > fail("got exceptions in threads"); > } > > - if (!ctrl.anyStalledThreads()) { > - assertTrue( > - "control claims no stalled threads but waiter seems to be > blocked", > - latches[2].await(10, TimeUnit.SECONDS)); > - } > - checkPoint.set(false); > + if (ctrl.hasBlocked() && ctrl.isHealthy()) { > + assertState(numReleasers, numStallers, numWaiters, threads, ctrl); > + > + > + } > > - latches[1].countDown(); > + checkPoint.set(false); > + sync.waiter.countDown(); > + sync.leftCheckpoint.await(); > } > assertFalse(checkPoint.get()); > + assertEquals(0, sync.waiter.getCount()); > if (random().nextInt(2) == 0) { > - latches[0] = new CountDownLatch(numStallers + numReleasers); > - latches[1] = new CountDownLatch(1); > - latches[2] = new CountDownLatch(numWaiters); > + sync.reset(numStallers + numReleasers, numStallers + numReleasers > + + numWaiters); > checkPoint.set(true); > } > > } > + if (!checkPoint.get()) { > + sync.reset(numStallers + numReleasers, numStallers + numReleasers > + + numWaiters); > + checkPoint.set(true); > + } > > + assertTrue(sync.updateJoin.await(10, TimeUnit.SECONDS)); > + assertState(numReleasers, numStallers, numWaiters, threads, ctrl); > + checkPoint.set(false); > stop.set(true); > - latches[1].countDown(); > + sync.waiter.countDown(); > + sync.leftCheckpoint.await(); > + > > for (int i = 0; i < threads.length; i++) { > memCtrl.limit = 1000; > @@ -196,20 +204,45 @@ public class TestDocumentsWriterStallCon > } > } > > + private void assertState(int numReleasers, int numStallers, int > numWaiters, Thread[] threads, DocumentsWriterStallControl ctrl) throws > InterruptedException { > + int millisToSleep = 100; > + while (true) { > + if (ctrl.hasBlocked() && ctrl.isHealthy()) { > + for (int n = numReleasers + numStallers; n < numReleasers > + + numStallers + numWaiters; n++) { > + if (ctrl.isThreadQueued(threads[n])) { > + if (millisToSleep < 60000) { > + Thread.sleep(millisToSleep); > + millisToSleep *=2; > + break; > + } else { > + fail("control claims no stalled threads but waiter seems to be > blocked "); > + } > + } > + } > + break; > + } else { > + break; > + } > + } > + > + } > + > public static class Waiter extends Thread { > - private CountDownLatch[] latches; > + private Synchonizer sync; > private DocumentsWriterStallControl ctrl; > private AtomicBoolean checkPoint; > private AtomicBoolean stop; > private List<Throwable> exceptions; > > public Waiter(AtomicBoolean stop, AtomicBoolean checkPoint, > - DocumentsWriterStallControl ctrl, CountDownLatch[] latches, > + DocumentsWriterStallControl ctrl, Synchonizer sync, > List<Throwable> exceptions) { > + super("waiter"); > this.stop = stop; > this.checkPoint = checkPoint; > this.ctrl = ctrl; > - this.latches = latches; > + this.sync = sync; > this.exceptions = exceptions; > } > > @@ -218,13 +251,10 @@ public class TestDocumentsWriterStallCon > while (!stop.get()) { > ctrl.waitIfStalled(); > if (checkPoint.get()) { > - CountDownLatch join = latches[2]; > - CountDownLatch wait = latches[1]; > - join.countDown(); > try { > - assertTrue(wait.await(10, TimeUnit.SECONDS)); > + assertTrue(sync.await()); > } catch (InterruptedException e) { > - System.out.println("[Waiter] got interrupted - wait count: " + > wait.getCount()); > + System.out.println("[Waiter] got interrupted - wait count: " + > sync.waiter.getCount()); > throw new ThreadInterruptedException(e); > } > } > @@ -238,7 +268,7 @@ public class TestDocumentsWriterStallCon > > public static class Updater extends Thread { > > - private CountDownLatch[] latches; > + private Synchonizer sync; > private DocumentsWriterStallControl ctrl; > private AtomicBoolean checkPoint; > private AtomicBoolean stop; > @@ -246,12 +276,13 @@ public class TestDocumentsWriterStallCon > private List<Throwable> exceptions; > > public Updater(AtomicBoolean stop, AtomicBoolean checkPoint, > - DocumentsWriterStallControl ctrl, CountDownLatch[] latches, > + DocumentsWriterStallControl ctrl, Synchonizer sync, > boolean release, List<Throwable> exceptions) { > + super("updater"); > this.stop = stop; > this.checkPoint = checkPoint; > this.ctrl = ctrl; > - this.latches = latches; > + this.sync = sync; > this.release = release; > this.exceptions = exceptions; > } > @@ -268,22 +299,24 @@ public class TestDocumentsWriterStallCon > ctrl.updateStalled(memCtrl); > } > if (checkPoint.get()) { > - CountDownLatch join = latches[0]; > - CountDownLatch wait = latches[1]; > - join.countDown(); > + sync.updateJoin.countDown(); > try { > - assertTrue(wait.await(10, TimeUnit.SECONDS)); > + assertTrue(sync.await()); > } catch (InterruptedException e) { > - System.out.println("[Updater] got interrupted - wait count: " > + wait.getCount()); > + System.out.println("[Updater] got interrupted - wait count: " > + sync.waiter.getCount()); > throw new ThreadInterruptedException(e); > } > + sync.leftCheckpoint.countDown(); > + } > + if (random().nextBoolean()) { > + Thread.yield(); > } > - Thread.yield(); > } > } catch (Throwable e) { > e.printStackTrace(); > exceptions.add(e); > } > + sync.updateJoin.countDown(); > } > > } > @@ -366,4 +399,25 @@ public class TestDocumentsWriterStallCon > } > > } > + > + private static final class Synchonizer { > + volatile CountDownLatch waiter; > + volatile CountDownLatch updateJoin; > + volatile CountDownLatch leftCheckpoint; > + > + public Synchonizer(int numUpdater, int numThreads) { > + reset(numUpdater, numThreads); > + } > + > + public void reset(int numUpdaters, int numThreads) { > + this.waiter = new CountDownLatch(1); > + this.updateJoin = new CountDownLatch(numUpdaters); > + this.leftCheckpoint = new CountDownLatch(numUpdaters); > + } > + > + public boolean await() throws InterruptedException { > + return waiter.await(10, TimeUnit.SECONDS); > + } > + > + } > } > > --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
